Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a "transient" network connectivity state #2696

Merged
merged 18 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions core/network/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ var DialPeerTimeout = 60 * time.Second
type noDialCtxKey struct{}
type dialPeerTimeoutCtxKey struct{}
type forceDirectDialCtxKey struct{}
type useTransientCtxKey struct{}
type allowLimitedConnCtxKey struct{}
type simConnectCtxKey struct{ isClient bool }

var noDial = noDialCtxKey{}
var forceDirectDial = forceDirectDialCtxKey{}
var useTransient = useTransientCtxKey{}
var allowLimitedConn = allowLimitedConnCtxKey{}
var simConnectIsServer = simConnectCtxKey{}
var simConnectIsClient = simConnectCtxKey{isClient: true}

Expand Down Expand Up @@ -94,15 +94,35 @@ func WithDialPeerTimeout(ctx context.Context, timeout time.Duration) context.Con
return context.WithValue(ctx, dialPeerTimeoutCtxKey{}, timeout)
}

// WithAllowLimitedConn constructs a new context with an option that instructs
// the network that it is acceptable to use a limited connection when opening a
// new stream.
func WithAllowLimitedConn(ctx context.Context, reason string) context.Context {
return context.WithValue(ctx, allowLimitedConn, reason)
}

// WithUseTransient constructs a new context with an option that instructs the network
// that it is acceptable to use a transient connection when opening a new stream.
//
// Deprecated: Use WithAllowLimitedConn instead.
func WithUseTransient(ctx context.Context, reason string) context.Context {
return context.WithValue(ctx, useTransient, reason)
return context.WithValue(ctx, allowLimitedConn, reason)
}

// GetAllowLimitedConn returns true if the allow limited conn option is set in the context.
func GetAllowLimitedConn(ctx context.Context) (usetransient bool, reason string) {
v := ctx.Value(allowLimitedConn)
if v != nil {
return true, v.(string)
}
return false, ""
}

// GetUseTransient returns true if the use transient option is set in the context.
//
// Deprecated: Use GetAllowLimitedConn instead.
func GetUseTransient(ctx context.Context) (usetransient bool, reason string) {
v := ctx.Value(useTransient)
v := ctx.Value(allowLimitedConn)
if v != nil {
return true, v.(string)
}
Expand Down
8 changes: 7 additions & 1 deletion core/network/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ var ErrNoConn = errors.New("no usable connection to peer")

// ErrTransientConn is returned when attempting to open a stream to a peer with only a transient
// connection, without specifying the UseTransient option.
var ErrTransientConn = errors.New("transient connection to peer")
//
// Deprecated: Use ErrLimitedConn instead.
var ErrTransientConn = ErrLimitedConn

// ErrLimitedConn is returned when attempting to open a stream to a peer with only a conn
// connection, without specifying the AllowLimitedConn option.
var ErrLimitedConn = errors.New("limited connection to peer")

// ErrResourceLimitExceeded is returned when attempting to perform an operation that would
// exceed system resource limits.
Expand Down
15 changes: 12 additions & 3 deletions core/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,23 @@ const (
// Connected means has an open, live connection to peer
Connected

// Deprecated: CanConnect is deprecated and will be removed in a future release.
//
// CanConnect means recently connected to peer, terminated gracefully
CanConnect

// Deprecated: CannotConnect is deprecated and will be removed in a future release.
//
// CannotConnect means recently attempted connecting but failed to connect.
// (should signal "made effort, failed")
CannotConnect
guillaumemichel marked this conversation as resolved.
Show resolved Hide resolved

// Limited means we have a transient connection to the peer, but aren't fully connected.
Limited
)

func (c Connectedness) String() string {
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"}
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Limited"}
if c < 0 || int(c) >= len(str) {
return unrecognized
}
Expand Down Expand Up @@ -111,8 +118,10 @@ type Stats struct {
Direction Direction
// Opened is the timestamp when this connection was opened.
Opened time.Time
// Transient indicates that this connection is transient and may be closed soon.
Transient bool
// Limited indicates that this connection is Limited. It maybe limited by
// bytes or time. In practice, this is a connection formed over a circuit v2
// relay.
Limited bool
// Extra stores additional metadata about this connection.
Extra map[interface{}]interface{}
}
Expand Down
4 changes: 3 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -724,8 +724,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

forceDirect, _ := network.GetForceDirectDial(ctx)
canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx)
if !forceDirect {
if h.Network().Connectedness(pi.ID) == network.Connected {
connectedness := h.Network().Connectedness(pi.ID)
if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) {
return nil
}
}
Expand Down
7 changes: 4 additions & 3 deletions p2p/host/blank/blank.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
}

bh := &BlankHost{
n: n,
cmgr: cfg.cmgr,
mux: mstream.NewMultistreamMuxer[protocol.ID](),
n: n,
cmgr: cfg.cmgr,
mux: mstream.NewMultistreamMuxer[protocol.ID](),
eventbus: cfg.eventBus,
}
if bh.eventbus == nil {
bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
Expand Down
13 changes: 7 additions & 6 deletions p2p/host/pstoremanager/pstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio
ev := e.(event.EvtPeerConnectednessChanged)
p := ev.Peer
switch ev.Connectedness {
case network.NotConnected:
case network.Connected, network.Limited:
// If we reconnect to the peer before we've cleared the information,
// keep it. This is an optimization to keep the disconnected map
// small. We still need to check that a peer is actually
// disconnected before removing it from the peer store.
delete(disconnected, p)
default:
if _, ok := disconnected[p]; !ok {
disconnected[p] = time.Now()
}
case network.Connected:
// If we reconnect to the peer before we've cleared the information, keep it.
// This is an optimization to keep the disconnected map small.
// We still need to check that a peer is actually disconnected before removing it from the peer store.
delete(disconnected, p)
}
case <-ticker.C:
now := time.Now()
Expand Down
4 changes: 3 additions & 1 deletion p2p/host/routed/routed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost {
func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
// first, check if we're already connected unless force direct dial.
forceDirect, _ := network.GetForceDirectDial(ctx)
canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx)
if !forceDirect {
if rh.Network().Connectedness(pi.ID) == network.Connected {
connectedness := rh.Network().Connectedness(pi.ID)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we wouldn't try to perform routing if we're just going to wait on an ongoing connection. But that's not a new issue.

if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) {
return nil
}
}
Expand Down
143 changes: 143 additions & 0 deletions p2p/net/swarm/connectedness_event_emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package swarm

import (
"context"
"sync"

"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
)

// connectednessEventEmitter emits PeerConnectednessChanged events.
// We ensure that for any peer we connected to we always sent atleast 1 NotConnected Event after
// the peer disconnects. This is because peers can observe a connection before they are notified
// of the connection by a peer connectedness changed event.
type connectednessEventEmitter struct {
mx sync.RWMutex
// newConns is the channel that holds the peerIDs we recently connected to
newConns chan peer.ID
removeConnsMx sync.Mutex
// removeConns is a slice of peerIDs we have recently closed connections to
removeConns []peer.ID
// lastEvent is the last connectedness event sent for a particular peer.
lastEvent map[peer.ID]network.Connectedness
// connectedness is the function that gives the peers current connectedness state
connectedness func(peer.ID) network.Connectedness
// emitter is the PeerConnectednessChanged event emitter
emitter event.Emitter
wg sync.WaitGroup
removeConnNotif chan struct{}
ctx context.Context
cancel context.CancelFunc
}

func newConnectednessEventEmitter(connectedness func(peer.ID) network.Connectedness, emitter event.Emitter) *connectednessEventEmitter {
ctx, cancel := context.WithCancel(context.Background())
c := &connectednessEventEmitter{
newConns: make(chan peer.ID, 32),
lastEvent: make(map[peer.ID]network.Connectedness),
removeConnNotif: make(chan struct{}, 1),
connectedness: connectedness,
emitter: emitter,
ctx: ctx,
cancel: cancel,
}
c.wg.Add(1)
go c.runEmitter()
return c
}

func (c *connectednessEventEmitter) AddConn(p peer.ID) {
c.mx.RLock()
defer c.mx.RUnlock()
if c.ctx.Err() != nil {
return
}

c.newConns <- p
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
c.mx.RLock()
defer c.mx.RUnlock()
if c.ctx.Err() != nil {
return
}

c.removeConnsMx.Lock()
// This queue is not unbounded since we block in the AddConn method
// So we are adding connections to the swarm only at a rate
// the subscriber for our peer connectedness changed events can consume them.
// If a lot of open connections are closed at once, increasing the disconnected
// event notification rate, the rate of adding connections to the swarm would
// proportionately reduce, which would eventually reduce the length of this slice.
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
c.removeConns = append(c.removeConns, p)
c.removeConnsMx.Unlock()

select {
case c.removeConnNotif <- struct{}{}:
default:
}
}

func (c *connectednessEventEmitter) Close() {
c.cancel()
c.wg.Wait()
}

func (c *connectednessEventEmitter) runEmitter() {
defer c.wg.Done()
for {
select {
case p := <-c.newConns:
c.notifyPeer(p, true)
case <-c.removeConnNotif:
c.sendConnRemovedNotifications()
case <-c.ctx.Done():
c.mx.Lock() // Wait for all pending AddConn & RemoveConn operations to complete
defer c.mx.Unlock()
for {
select {
case p := <-c.newConns:
c.notifyPeer(p, true)
case <-c.removeConnNotif:
c.sendConnRemovedNotifications()
default:
return
}
}
}
}
}

func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
oldState := c.lastEvent[p]
c.lastEvent[p] = c.connectedness(p)
if c.lastEvent[p] == network.NotConnected {
delete(c.lastEvent, p)
}
if (forceNotConnectedEvent && c.lastEvent[p] == network.NotConnected) || c.lastEvent[p] != oldState {
c.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: c.lastEvent[p],
})
}
}

func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
c.removeConnsMx.Lock()
defer c.removeConnsMx.Unlock()
for {
if len(c.removeConns) == 0 {
return
}
p := c.removeConns[0]
c.removeConns[0] = ""
c.removeConns = c.removeConns[1:]

c.removeConnsMx.Unlock()
c.notifyPeer(p, false)
c.removeConnsMx.Lock()
}
sukunrt marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ loop:
ad.expectedTCPUpgradeTime = time.Time{}
if res.Conn != nil {
// we got a connection, add it to the swarm
conn, err := w.s.addConn(res.Conn, network.DirOutbound)
conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound)
if err != nil {
// oops no, we failed to add it to the swarm
res.Conn.Close()
Expand Down
Loading
Loading