From 374ec488b4bf6e3ecf4468a5f3cc679449a10113 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 29 Jan 2024 14:44:25 -0800 Subject: [PATCH 01/18] Add a "transient" network connectivity state Previously, we'd consider "transiently" connected peers to be connected. This meant: 1. We wouldn't fire a second event when transitioning to "really connected". The only option for users was to listen on the old-style per-connection notifications. 2. "Connectedness" checks would be a little too eager to treat a peer as connected. For 99% of users, "transient" peers should be treated as disconnected. So while it's technically a breaking change to split-out "transient" connectivity into a separate state, I expect it's more likely to fix bugs than anything. Unfortunately, this change _did_ require several changes to go-libp2p itself because go-libp2p _does_ care about transient connections: 1. We want to keep peerstore information for transient peers. 2. We may sometimes want to treat peers as "connected" in the host. 3. Identify still needs to run over transient connections. fixes #2692 --- core/network/network.go | 5 ++- p2p/host/basic/basic_host.go | 4 ++- p2p/host/pstoremanager/pstoremanager.go | 13 ++++---- p2p/host/routed/routed.go | 4 ++- p2p/net/swarm/swarm.go | 42 ++++++++++++++++++++----- p2p/protocol/identify/id.go | 17 +++++----- 6 files changed, 61 insertions(+), 24 deletions(-) diff --git a/core/network/network.go b/core/network/network.go index 66b0a1cd34..9148fc3309 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -61,10 +61,13 @@ const ( // CannotConnect means recently attempted connecting but failed to connect. // (should signal "made effort, failed") CannotConnect + + // Transient means we have a transient connection to the peer, but aren't fully connected. + Transient ) func (c Connectedness) String() string { - str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"} + str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Transient"} if c < 0 || int(c) >= len(str) { return unrecognized } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 367fca05f2..4ac3737097 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -724,8 +724,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) forceDirect, _ := network.GetForceDirectDial(ctx) + canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { - if h.Network().Connectedness(pi.ID) == network.Connected { + connectedness := h.Network().Connectedness(pi.ID) + if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { return nil } } diff --git a/p2p/host/pstoremanager/pstoremanager.go b/p2p/host/pstoremanager/pstoremanager.go index 2a22b2caee..82c55e8cbf 100644 --- a/p2p/host/pstoremanager/pstoremanager.go +++ b/p2p/host/pstoremanager/pstoremanager.go @@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio ev := e.(event.EvtPeerConnectednessChanged) p := ev.Peer switch ev.Connectedness { - case network.NotConnected: + case network.Connected, network.Transient: + // If we reconnect to the peer before we've cleared the information, + // keep it. This is an optimization to keep the disconnected map + // small. We still need to check that a peer is actually + // disconnected before removing it from the peer store. + delete(disconnected, p) + default: if _, ok := disconnected[p]; !ok { disconnected[p] = time.Now() } - case network.Connected: - // If we reconnect to the peer before we've cleared the information, keep it. - // This is an optimization to keep the disconnected map small. - // We still need to check that a peer is actually disconnected before removing it from the peer store. - delete(disconnected, p) } case <-ticker.C: now := time.Now() diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index eb8e58ee7f..a6e43703c9 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost { func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // first, check if we're already connected unless force direct dial. forceDirect, _ := network.GetForceDirectDial(ctx) + canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { - if rh.Network().Connectedness(pi.ID) == network.Connected { + connectedness := rh.Network().Connectedness(pi.ID) + if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { return nil } } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0140c3f596..eb50072beb 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -350,6 +350,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } stat.Direction = dir stat.Opened = time.Now() + isTransient := stat.Transient // Wrap and register the connection. c := &Conn{ @@ -389,8 +390,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } + oldState := s.connectednessUnlocked(p) + c.streams.m = make(map[*Stream]struct{}) - isFirstConnection := len(s.conns.m[p]) == 0 s.conns.m[p] = append(s.conns.m[p], c) // Add two swarm refs: @@ -403,8 +405,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - // Notify goroutines waiting for a direct connection - if !c.Stat().Transient { + newState := network.Transient + if !isTransient { + newState = network.Connected + + // Notify goroutines waiting for a direct connection + // // Go routines interested in waiting for direct connection first acquire this lock // and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to // prevent deadlock. @@ -418,10 +424,10 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, // Emit event after releasing `s.conns` lock so that a consumer can still // use swarm methods that need the `s.conns` lock. - if isFirstConnection { + if oldState != newState { s.emitter.Emit(event.EvtPeerConnectednessChanged{ Peer: p, - Connectedness: network.Connected, + Connectedness: newState, }) } @@ -652,10 +658,30 @@ func isDirectConn(c *Conn) bool { // To check if we have an open connection, use `s.Connectedness(p) == // network.Connected`. func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { - if s.bestConnToPeer(p) != nil { - return network.Connected + s.conns.RLock() + defer s.conns.RUnlock() + + return s.connectednessUnlocked(p) +} + +func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { + var haveTransient bool + for _, c := range s.conns.m[p] { + if c.conn.IsClosed() { + // We *will* garbage collect this soon anyways. + continue + } + if c.Stat().Transient { + haveTransient = true + } else { + return network.Connected + } + } + if haveTransient { + return network.Transient + } else { + return network.NotConnected } - return network.NotConnected } // Conns returns a slice of all connections. diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 4bb4a59243..51db0d8067 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -752,7 +752,8 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo // Taking the lock ensures that we don't concurrently process a disconnect. ids.addrMu.Lock() ttl := peerstore.RecentlyConnectedAddrTTL - if ids.Host.Network().Connectedness(p) == network.Connected { + switch ids.Host.Network().Connectedness(p) { + case network.Transient, network.Connected: ttl = peerstore.ConnectedAddrTTL } @@ -980,13 +981,15 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { delete(ids.conns, c) ids.connsMu.Unlock() - if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected { - // Last disconnect. - // Undo the setting of addresses to peer.ConnectedAddrTTL we did - ids.addrMu.Lock() - defer ids.addrMu.Unlock() - ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) + switch ids.Host.Network().Connectedness(c.RemotePeer()) { + case network.Connected, network.Transient: + return } + // Last disconnect. + // Undo the setting of addresses to peer.ConnectedAddrTTL we did + ids.addrMu.Lock() + defer ids.addrMu.Unlock() + ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL) } func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {} From 14024ef58cae8e8ec414f674ed8547731a163669 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 21 Mar 2024 16:42:26 +0100 Subject: [PATCH 02/18] rename Transient to Limited and addressed review --- core/network/network.go | 10 +++++++--- p2p/host/basic/basic_host.go | 2 +- p2p/host/pstoremanager/pstoremanager.go | 2 +- p2p/host/routed/routed.go | 2 +- p2p/net/swarm/swarm.go | 10 ++++++---- p2p/protocol/identify/id.go | 4 ++-- 6 files changed, 18 insertions(+), 12 deletions(-) diff --git a/core/network/network.go b/core/network/network.go index 9148fc3309..1c8e620645 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -55,19 +55,23 @@ const ( // Connected means has an open, live connection to peer Connected + // Deprecated: CanConnect is deprecated and will be removed in a future release. + // // CanConnect means recently connected to peer, terminated gracefully CanConnect + // Deprecated: CannotConnect is deprecated and will be removed in a future release. + // // CannotConnect means recently attempted connecting but failed to connect. // (should signal "made effort, failed") CannotConnect - // Transient means we have a transient connection to the peer, but aren't fully connected. - Transient + // Limited means we have a transient connection to the peer, but aren't fully connected. + Limited ) func (c Connectedness) String() string { - str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Transient"} + str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Limited"} if c < 0 || int(c) >= len(str) { return unrecognized } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 4ac3737097..29ac0e0c77 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -727,7 +727,7 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { connectedness := h.Network().Connectedness(pi.ID) - if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { + if connectedness == network.Connected || (canUseTransient && connectedness == network.Limited) { return nil } } diff --git a/p2p/host/pstoremanager/pstoremanager.go b/p2p/host/pstoremanager/pstoremanager.go index 82c55e8cbf..93cc2a98d9 100644 --- a/p2p/host/pstoremanager/pstoremanager.go +++ b/p2p/host/pstoremanager/pstoremanager.go @@ -103,7 +103,7 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio ev := e.(event.EvtPeerConnectednessChanged) p := ev.Peer switch ev.Connectedness { - case network.Connected, network.Transient: + case network.Connected, network.Limited: // If we reconnect to the peer before we've cleared the information, // keep it. This is an optimization to keep the disconnected map // small. We still need to check that a peer is actually diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index a6e43703c9..ccab506b5c 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -51,7 +51,7 @@ func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { canUseTransient, _ := network.GetUseTransient(ctx) if !forceDirect { connectedness := rh.Network().Connectedness(pi.ID) - if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) { + if connectedness == network.Connected || (canUseTransient && connectedness == network.Limited) { return nil } } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index eb50072beb..386fb53a61 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -395,6 +395,11 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) + newState := network.Limited + if !isTransient { + newState = network.Connected + } + // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose // * The other will be decremented when Conn.start exits. @@ -405,10 +410,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - newState := network.Transient if !isTransient { - newState = network.Connected - // Notify goroutines waiting for a direct connection // // Go routines interested in waiting for direct connection first acquire this lock @@ -678,7 +680,7 @@ func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { } } if haveTransient { - return network.Transient + return network.Limited } else { return network.NotConnected } diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 51db0d8067..66da984531 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -753,7 +753,7 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo ids.addrMu.Lock() ttl := peerstore.RecentlyConnectedAddrTTL switch ids.Host.Network().Connectedness(p) { - case network.Transient, network.Connected: + case network.Limited, network.Connected: ttl = peerstore.ConnectedAddrTTL } @@ -982,7 +982,7 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { ids.connsMu.Unlock() switch ids.Host.Network().Connectedness(c.RemotePeer()) { - case network.Connected, network.Transient: + case network.Connected, network.Limited: return } // Last disconnect. From e3215d94d5ed8a2fc67d70dfdccdbb568030474a Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 22 Mar 2024 08:49:55 +0100 Subject: [PATCH 03/18] updated removeConn --- p2p/net/swarm/swarm.go | 44 +++++++++++++++++++----------------------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 386fb53a61..0231755492 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -394,11 +394,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) - - newState := network.Limited - if !isTransient { - newState = network.Connected - } + newState := s.connectednessUnlocked(p) // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose @@ -782,31 +778,31 @@ func (s *Swarm) removeConn(c *Conn) { cs := s.conns.m[p] + oldState := s.connectednessUnlocked(p) if len(cs) == 1 { delete(s.conns.m, p) - s.conns.Unlock() + } else { + for i, ci := range cs { + if ci == c { + // NOTE: We're intentionally preserving order. + // This way, connections to a peer are always + // sorted oldest to newest. + copy(cs[i:], cs[i+1:]) + cs[len(cs)-1] = nil + s.conns.m[p] = cs[:len(cs)-1] + break + } + } + } + newState := s.connectednessUnlocked(p) - // Emit event after releasing `s.conns` lock so that a consumer can still - // use swarm methods that need the `s.conns` lock. + s.conns.Unlock() + + if oldState != newState { s.emitter.Emit(event.EvtPeerConnectednessChanged{ Peer: p, - Connectedness: network.NotConnected, + Connectedness: newState, }) - return - } - - defer s.conns.Unlock() - - for i, ci := range cs { - if ci == c { - // NOTE: We're intentionally preserving order. - // This way, connections to a peer are always - // sorted oldest to newest. - copy(cs[i:], cs[i+1:]) - cs[len(cs)-1] = nil - s.conns.m[p] = cs[:len(cs)-1] - break - } } } From 5ee3942e48cea2d870d70be9a4b61d577664a583 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 22 Mar 2024 10:28:23 +0100 Subject: [PATCH 04/18] corrected removeConn --- p2p/net/swarm/swarm.go | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0231755492..1661100bf2 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -779,19 +779,27 @@ func (s *Swarm) removeConn(c *Conn) { cs := s.conns.m[p] oldState := s.connectednessUnlocked(p) + if len(cs) == 1 { delete(s.conns.m, p) - } else { - for i, ci := range cs { - if ci == c { - // NOTE: We're intentionally preserving order. - // This way, connections to a peer are always - // sorted oldest to newest. - copy(cs[i:], cs[i+1:]) - cs[len(cs)-1] = nil - s.conns.m[p] = cs[:len(cs)-1] - break - } + s.conns.Unlock() + + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: network.NotConnected, + }) + return + } + + for i, ci := range cs { + if ci == c { + // NOTE: We're intentionally preserving order. + // This way, connections to a peer are always + // sorted oldest to newest. + copy(cs[i:], cs[i+1:]) + cs[len(cs)-1] = nil + s.conns.m[p] = cs[:len(cs)-1] + break } } newState := s.connectednessUnlocked(p) From bed69dc64cf171f209f28e811173fc138ab4264f Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 22 Mar 2024 10:30:03 +0100 Subject: [PATCH 05/18] restored comment --- p2p/net/swarm/swarm.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 1661100bf2..c7f0844ef9 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -784,6 +784,8 @@ func (s *Swarm) removeConn(c *Conn) { delete(s.conns.m, p) s.conns.Unlock() + // Emit event after releasing `s.conns` lock so that a consumer can still + // use swarm methods that need the `s.conns` lock. s.emitter.Emit(event.EvtPeerConnectednessChanged{ Peer: p, Connectedness: network.NotConnected, From 678d8b4ad1e9cdd7cb0a99f93708e2a7c9cd5feb Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Fri, 22 Mar 2024 16:21:16 +0100 Subject: [PATCH 06/18] updated connectednessUnlocked --- p2p/net/swarm/swarm.go | 36 ++++++++++++------------------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index c7f0844ef9..becd6f4dbe 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -665,10 +665,6 @@ func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { var haveTransient bool for _, c := range s.conns.m[p] { - if c.conn.IsClosed() { - // We *will* garbage collect this soon anyways. - continue - } if c.Stat().Transient { haveTransient = true } else { @@ -782,28 +778,20 @@ func (s *Swarm) removeConn(c *Conn) { if len(cs) == 1 { delete(s.conns.m, p) - s.conns.Unlock() - - // Emit event after releasing `s.conns` lock so that a consumer can still - // use swarm methods that need the `s.conns` lock. - s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: network.NotConnected, - }) - return - } - - for i, ci := range cs { - if ci == c { - // NOTE: We're intentionally preserving order. - // This way, connections to a peer are always - // sorted oldest to newest. - copy(cs[i:], cs[i+1:]) - cs[len(cs)-1] = nil - s.conns.m[p] = cs[:len(cs)-1] - break + } else { + for i, ci := range cs { + if ci == c { + // NOTE: We're intentionally preserving order. + // This way, connections to a peer are always + // sorted oldest to newest. + copy(cs[i:], cs[i+1:]) + cs[len(cs)-1] = nil + s.conns.m[p] = cs[:len(cs)-1] + break + } } } + newState := s.connectednessUnlocked(p) s.conns.Unlock() From 9b9556a9659691f408e7893a6345f70b0d668f93 Mon Sep 17 00:00:00 2001 From: sukun Date: Fri, 22 Mar 2024 21:50:07 +0530 Subject: [PATCH 07/18] add option to consider closed connections for connectedness --- p2p/net/swarm/swarm.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index becd6f4dbe..6eeaa6baa2 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -390,11 +390,11 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } - oldState := s.connectednessUnlocked(p) + oldState := s.connectednessUnlocked(p, true) c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) - newState := s.connectednessUnlocked(p) + newState := s.connectednessUnlocked(p, true) // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose @@ -659,12 +659,20 @@ func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { s.conns.RLock() defer s.conns.RUnlock() - return s.connectednessUnlocked(p) + return s.connectednessUnlocked(p, false) } -func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { +// connectednessUnlocked returns the connectedness of a peer. For sending peer connectedness +// changed notifications consider closed connections. When remote closes a connection +// the underlying transport connection is closed first, so tracking changes to the connectedness +// state requires considering this recently closed connections impact on Connectedness. +func (s *Swarm) connectednessUnlocked(p peer.ID, considerClosed bool) network.Connectedness { var haveTransient bool for _, c := range s.conns.m[p] { + if !considerClosed && c.IsClosed() { + // These will be garbage collected soon + continue + } if c.Stat().Transient { haveTransient = true } else { @@ -774,7 +782,7 @@ func (s *Swarm) removeConn(c *Conn) { cs := s.conns.m[p] - oldState := s.connectednessUnlocked(p) + oldState := s.connectednessUnlocked(p, true) if len(cs) == 1 { delete(s.conns.m, p) @@ -792,7 +800,7 @@ func (s *Swarm) removeConn(c *Conn) { } } - newState := s.connectednessUnlocked(p) + newState := s.connectednessUnlocked(p, true) s.conns.Unlock() From 84bc0b1a815b27ee678a3385c4e28d0e8981f2fc Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 24 Apr 2024 16:19:43 -0700 Subject: [PATCH 08/18] More renaming --- core/network/context.go | 28 ++++++++++++--- core/network/errors.go | 8 ++++- core/network/network.go | 6 ++-- p2p/host/basic/basic_host.go | 4 +-- p2p/host/routed/routed.go | 4 +-- p2p/net/swarm/swarm.go | 40 +++++++++++----------- p2p/net/swarm/swarm_conn.go | 2 +- p2p/protocol/circuitv2/client/dial.go | 2 +- p2p/protocol/circuitv2/client/handlers.go | 2 +- p2p/protocol/circuitv2/relay/relay_test.go | 6 ++-- 10 files changed, 65 insertions(+), 37 deletions(-) diff --git a/core/network/context.go b/core/network/context.go index 7fabfb53e0..75db775932 100644 --- a/core/network/context.go +++ b/core/network/context.go @@ -13,12 +13,12 @@ var DialPeerTimeout = 60 * time.Second type noDialCtxKey struct{} type dialPeerTimeoutCtxKey struct{} type forceDirectDialCtxKey struct{} -type useTransientCtxKey struct{} +type allowLimitedConnCtxKey struct{} type simConnectCtxKey struct{ isClient bool } var noDial = noDialCtxKey{} var forceDirectDial = forceDirectDialCtxKey{} -var useTransient = useTransientCtxKey{} +var allowLimitedConn = allowLimitedConnCtxKey{} var simConnectIsServer = simConnectCtxKey{} var simConnectIsClient = simConnectCtxKey{isClient: true} @@ -94,15 +94,35 @@ func WithDialPeerTimeout(ctx context.Context, timeout time.Duration) context.Con return context.WithValue(ctx, dialPeerTimeoutCtxKey{}, timeout) } +// WithAllowLimitedConn constructs a new context with an option that instructs +// the network that it is acceptable to use a limited connection when opening a +// new stream. +func WithAllowLimitedConn(ctx context.Context, reason string) context.Context { + return context.WithValue(ctx, allowLimitedConn, reason) +} + // WithUseTransient constructs a new context with an option that instructs the network // that it is acceptable to use a transient connection when opening a new stream. +// +// Deprecated: Use WithAllowLimitedConn instead. func WithUseTransient(ctx context.Context, reason string) context.Context { - return context.WithValue(ctx, useTransient, reason) + return context.WithValue(ctx, allowLimitedConn, reason) +} + +// GetAllowLimitedConn returns true if the allow limited conn option is set in the context. +func GetAllowLimitedConn(ctx context.Context) (usetransient bool, reason string) { + v := ctx.Value(allowLimitedConn) + if v != nil { + return true, v.(string) + } + return false, "" } // GetUseTransient returns true if the use transient option is set in the context. +// +// Deprecated: Use GetAllowLimitedConn instead. func GetUseTransient(ctx context.Context) (usetransient bool, reason string) { - v := ctx.Value(useTransient) + v := ctx.Value(allowLimitedConn) if v != nil { return true, v.(string) } diff --git a/core/network/errors.go b/core/network/errors.go index 03bb90c266..0f98cd5a28 100644 --- a/core/network/errors.go +++ b/core/network/errors.go @@ -22,7 +22,13 @@ var ErrNoConn = errors.New("no usable connection to peer") // ErrTransientConn is returned when attempting to open a stream to a peer with only a transient // connection, without specifying the UseTransient option. -var ErrTransientConn = errors.New("transient connection to peer") +// +// Deprecated: Use ErrLimitedConn instead. +var ErrTransientConn = ErrLimitedConn + +// ErrLimitedConn is returned when attempting to open a stream to a peer with only a conn +// connection, without specifying the AllowLimitedConn option. +var ErrLimitedConn = errors.New("limited connection to peer") // ErrResourceLimitExceeded is returned when attempting to perform an operation that would // exceed system resource limits. diff --git a/core/network/network.go b/core/network/network.go index 1c8e620645..22efbf235d 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -118,8 +118,10 @@ type Stats struct { Direction Direction // Opened is the timestamp when this connection was opened. Opened time.Time - // Transient indicates that this connection is transient and may be closed soon. - Transient bool + // Limited indicates that this connection is Limited. It maybe limited by + // bytes or time. In practice, this is a connection formed over a circuit v2 + // relay. + Limited bool // Extra stores additional metadata about this connection. Extra map[interface{}]interface{} } diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 29ac0e0c77..d80445e32e 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -724,10 +724,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error { h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL) forceDirect, _ := network.GetForceDirectDial(ctx) - canUseTransient, _ := network.GetUseTransient(ctx) + canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx) if !forceDirect { connectedness := h.Network().Connectedness(pi.ID) - if connectedness == network.Connected || (canUseTransient && connectedness == network.Limited) { + if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) { return nil } } diff --git a/p2p/host/routed/routed.go b/p2p/host/routed/routed.go index ccab506b5c..8248e50f0e 100644 --- a/p2p/host/routed/routed.go +++ b/p2p/host/routed/routed.go @@ -48,10 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost { func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error { // first, check if we're already connected unless force direct dial. forceDirect, _ := network.GetForceDirectDial(ctx) - canUseTransient, _ := network.GetUseTransient(ctx) + canUseLimitedConn, _ := network.GetAllowLimitedConn(ctx) if !forceDirect { connectedness := rh.Network().Connectedness(pi.ID) - if connectedness == network.Connected || (canUseTransient && connectedness == network.Limited) { + if connectedness == network.Connected || (canUseLimitedConn && connectedness == network.Limited) { return nil } } diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 6eeaa6baa2..f13875b5d4 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -350,7 +350,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, } stat.Direction = dir stat.Opened = time.Now() - isTransient := stat.Transient + isLimited := stat.Limited // Wrap and register the connection. c := &Conn{ @@ -406,7 +406,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() - if !isTransient { + if !isLimited { // Notify goroutines waiting for a direct connection // // Go routines interested in waiting for direct connection first acquire this lock @@ -459,14 +459,14 @@ func (s *Swarm) StreamHandler() network.StreamHandler { // NewStream creates a new stream on any available connection to peer, dialing // if necessary. -// Use network.WithUseTransient to open a stream over a transient(relayed) +// Use network.WithAllowLimitedConn to open a stream over a limited(relayed) // connection. func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error) { log.Debugf("[%s] opening stream to peer [%s]", s.local, p) // Algorithm: // 1. Find the best connection, otherwise, dial. - // 2. If the best connection is transient, wait for a direct conn via conn + // 2. If the best connection is limited, wait for a direct conn via conn // reversal or hole punching. // 3. Try opening a stream. // 4. If the underlying connection is, in fact, closed, close the outer @@ -495,8 +495,8 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error } } - useTransient, _ := network.GetUseTransient(ctx) - if !useTransient && c.Stat().Transient { + limitedAllowed, _ := network.GetAllowLimitedConn(ctx) + if !limitedAllowed && c.Stat().Limited { var err error c, err = s.waitForDirectConn(ctx, p) if err != nil { @@ -522,12 +522,12 @@ func (s *Swarm) waitForDirectConn(ctx context.Context, p peer.ID) (*Conn, error) if c == nil { s.directConnNotifs.Unlock() return nil, network.ErrNoConn - } else if !c.Stat().Transient { + } else if !c.Stat().Limited { s.directConnNotifs.Unlock() return c, nil } - // Wait for transient connection to upgrade to a direct connection either by + // Wait for limited connection to upgrade to a direct connection either by // connection reversal or hole punching. ch := make(chan struct{}) s.directConnNotifs.m[p] = append(s.directConnNotifs.m[p], ch) @@ -559,8 +559,8 @@ func (s *Swarm) waitForDirectConn(ctx context.Context, p peer.ID) (*Conn, error) if c == nil { return nil, network.ErrNoConn } - if c.Stat().Transient { - return nil, network.ErrTransientConn + if c.Stat().Limited { + return nil, network.ErrLimitedConn } return c, nil } @@ -581,11 +581,11 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn { } func isBetterConn(a, b *Conn) bool { - // If one is transient and not the other, prefer the non-transient connection. - aTransient := a.Stat().Transient - bTransient := b.Stat().Transient - if aTransient != bTransient { - return !aTransient + // If one is limited and not the other, prefer the unlimited connection. + aLimited := a.Stat().Limited + bLimited := b.Stat().Limited + if aLimited != bLimited { + return !aLimited } // If one is direct and not the other, prefer the direct connection. @@ -636,7 +636,7 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn { // bestAcceptableConnToPeer returns the best acceptable connection, considering the passed in ctx. // If network.WithForceDirectDial is used, it only returns a direct connections, ignoring -// any transient (relayed) connections to the peer. +// any limited (relayed) connections to the peer. func (s *Swarm) bestAcceptableConnToPeer(ctx context.Context, p peer.ID) *Conn { conn := s.bestConnToPeer(p) @@ -667,19 +667,19 @@ func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { // the underlying transport connection is closed first, so tracking changes to the connectedness // state requires considering this recently closed connections impact on Connectedness. func (s *Swarm) connectednessUnlocked(p peer.ID, considerClosed bool) network.Connectedness { - var haveTransient bool + var haveLimited bool for _, c := range s.conns.m[p] { if !considerClosed && c.IsClosed() { // These will be garbage collected soon continue } - if c.Stat().Transient { - haveTransient = true + if c.Stat().Limited { + haveLimited = true } else { return network.Connected } } - if haveTransient { + if haveLimited { return network.Limited } else { return network.NotConnected diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index 8c3ce7c5ac..5fe759483b 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -193,7 +193,7 @@ func (c *Conn) Stat() network.ConnStats { // NewStream returns a new Stream from this connection func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) { - if c.Stat().Transient { + if c.Stat().Limited { if useTransient, _ := network.GetUseTransient(ctx); !useTransient { return nil, network.ErrTransientConn } diff --git a/p2p/protocol/circuitv2/client/dial.go b/p2p/protocol/circuitv2/client/dial.go index ecf5d3a51a..271652506a 100644 --- a/p2p/protocol/circuitv2/client/dial.go +++ b/p2p/protocol/circuitv2/client/dial.go @@ -179,7 +179,7 @@ func (c *Client) connect(s network.Stream, dest peer.AddrInfo) (*Conn, error) { // relay connection and we mark the connection as transient. var stat network.ConnStats if limit := msg.GetLimit(); limit != nil { - stat.Transient = true + stat.Limited = true stat.Extra = make(map[interface{}]interface{}) stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second stat.Extra[StatLimitData] = limit.GetData() diff --git a/p2p/protocol/circuitv2/client/handlers.go b/p2p/protocol/circuitv2/client/handlers.go index 6b5361b123..9c36de0e89 100644 --- a/p2p/protocol/circuitv2/client/handlers.go +++ b/p2p/protocol/circuitv2/client/handlers.go @@ -67,7 +67,7 @@ func (c *Client) handleStreamV2(s network.Stream) { // relay connection and we mark the connection as transient. var stat network.ConnStats if limit := msg.GetLimit(); limit != nil { - stat.Transient = true + stat.Limited = true stat.Extra = make(map[interface{}]interface{}) stat.Extra[StatLimitDuration] = time.Duration(limit.GetDuration()) * time.Second stat.Extra[StatLimitData] = limit.GetData() diff --git a/p2p/protocol/circuitv2/relay/relay_test.go b/p2p/protocol/circuitv2/relay/relay_test.go index a229fe4aac..8bd05a0b3a 100644 --- a/p2p/protocol/circuitv2/relay/relay_test.go +++ b/p2p/protocol/circuitv2/relay/relay_test.go @@ -154,7 +154,7 @@ func TestBasicRelay(t *testing.T) { if len(conns) != 1 { t.Fatalf("expected 1 connection, but got %d", len(conns)) } - if !conns[0].Stat().Transient { + if !conns[0].Stat().Limited { t.Fatal("expected transient connection") } @@ -229,7 +229,7 @@ func TestRelayLimitTime(t *testing.T) { if len(conns) != 1 { t.Fatalf("expected 1 connection, but got %d", len(conns)) } - if !conns[0].Stat().Transient { + if !conns[0].Stat().Limited { t.Fatal("expected transient connection") } @@ -315,7 +315,7 @@ func TestRelayLimitData(t *testing.T) { if len(conns) != 1 { t.Fatalf("expected 1 connection, but got %d", len(conns)) } - if !conns[0].Stat().Transient { + if !conns[0].Stat().Limited { t.Fatal("expected transient connection") } From 98191a4348c787d540edf3e2144dc7824aa07a59 Mon Sep 17 00:00:00 2001 From: guillaumemichel Date: Thu, 25 Apr 2024 14:32:22 +0200 Subject: [PATCH 09/18] replaced GetUseTransient with GetAllowLimitedConn --- p2p/net/swarm/swarm_conn.go | 4 ++-- p2p/protocol/circuitv2/relay/relay_test.go | 6 +++--- p2p/protocol/holepunch/holepunch_test.go | 2 +- p2p/protocol/holepunch/holepuncher.go | 2 +- p2p/protocol/identify/id.go | 2 +- p2p/protocol/ping/ping.go | 2 +- p2p/test/basichost/basic_host_test.go | 2 +- p2p/test/swarm/swarm_test.go | 6 +++--- 8 files changed, 13 insertions(+), 13 deletions(-) diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index 5fe759483b..17ae1dffae 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -194,8 +194,8 @@ func (c *Conn) Stat() network.ConnStats { // NewStream returns a new Stream from this connection func (c *Conn) NewStream(ctx context.Context) (network.Stream, error) { if c.Stat().Limited { - if useTransient, _ := network.GetUseTransient(ctx); !useTransient { - return nil, network.ErrTransientConn + if useLimited, _ := network.GetAllowLimitedConn(ctx); !useLimited { + return nil, network.ErrLimitedConn } } diff --git a/p2p/protocol/circuitv2/relay/relay_test.go b/p2p/protocol/circuitv2/relay/relay_test.go index 8bd05a0b3a..db6d1146ce 100644 --- a/p2p/protocol/circuitv2/relay/relay_test.go +++ b/p2p/protocol/circuitv2/relay/relay_test.go @@ -158,7 +158,7 @@ func TestBasicRelay(t *testing.T) { t.Fatal("expected transient connection") } - s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test") + s, err := hosts[2].NewStream(network.WithAllowLimitedConn(ctx, "test"), hosts[0].ID(), "test") if err != nil { t.Fatal(err) } @@ -233,7 +233,7 @@ func TestRelayLimitTime(t *testing.T) { t.Fatal("expected transient connection") } - s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test") + s, err := hosts[2].NewStream(network.WithAllowLimitedConn(ctx, "test"), hosts[0].ID(), "test") if err != nil { t.Fatal(err) } @@ -319,7 +319,7 @@ func TestRelayLimitData(t *testing.T) { t.Fatal("expected transient connection") } - s, err := hosts[2].NewStream(network.WithUseTransient(ctx, "test"), hosts[0].ID(), "test") + s, err := hosts[2].NewStream(network.WithAllowLimitedConn(ctx, "test"), hosts[0].ID(), "test") if err != nil { t.Fatal(err) } diff --git a/p2p/protocol/holepunch/holepunch_test.go b/p2p/protocol/holepunch/holepunch_test.go index 1f3d9263df..23593c7970 100644 --- a/p2p/protocol/holepunch/holepunch_test.go +++ b/p2p/protocol/holepunch/holepunch_test.go @@ -339,7 +339,7 @@ func TestFailuresOnResponder(t *testing.T) { defer h2.Close() defer relay.Close() - s, err := h2.NewStream(network.WithUseTransient(context.Background(), "holepunch"), h1.ID(), holepunch.Protocol) + s, err := h2.NewStream(network.WithAllowLimitedConn(context.Background(), "holepunch"), h1.ID(), holepunch.Protocol) require.NoError(t, err) go tc.initiator(s) diff --git a/p2p/protocol/holepunch/holepuncher.go b/p2p/protocol/holepunch/holepuncher.go index b651bd7822..479376ef09 100644 --- a/p2p/protocol/holepunch/holepuncher.go +++ b/p2p/protocol/holepunch/holepuncher.go @@ -174,7 +174,7 @@ func (hp *holePuncher) directConnect(rp peer.ID) error { // initiateHolePunch opens a new hole punching coordination stream, // exchanges the addresses and measures the RTT. func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) { - hpCtx := network.WithUseTransient(hp.ctx, "hole-punch") + hpCtx := network.WithAllowLimitedConn(hp.ctx, "hole-punch") sCtx := network.WithNoDial(hpCtx, "hole-punch") str, err := hp.host.NewStream(sCtx, rp, Protocol) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 66da984531..7ae4feb935 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -408,7 +408,7 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} { func (ids *idService) identifyConn(c network.Conn) error { ctx, cancel := context.WithTimeout(context.Background(), Timeout) defer cancel() - s, err := c.NewStream(network.WithUseTransient(ctx, "identify")) + s, err := c.NewStream(network.WithAllowLimitedConn(ctx, "identify")) if err != nil { log.Debugw("error opening identify stream", "peer", c.RemotePeer(), "error", err) return err diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go index 1c78229084..9a67715593 100644 --- a/p2p/protocol/ping/ping.go +++ b/p2p/protocol/ping/ping.go @@ -111,7 +111,7 @@ func pingError(err error) chan Result { // Ping pings the remote peer until the context is canceled, returning a stream // of RTTs or errors. func Ping(ctx context.Context, h host.Host, p peer.ID) <-chan Result { - s, err := h.NewStream(network.WithUseTransient(ctx, "ping"), p, ID) + s, err := h.NewStream(network.WithAllowLimitedConn(ctx, "ping"), p, ID) if err != nil { return pingError(err) } diff --git a/p2p/test/basichost/basic_host_test.go b/p2p/test/basichost/basic_host_test.go index e6cd7ea9d9..9cd442dbf0 100644 --- a/p2p/test/basichost/basic_host_test.go +++ b/p2p/test/basichost/basic_host_test.go @@ -77,7 +77,7 @@ func TestNoStreamOverTransientConnection(t *testing.T) { require.Error(t, err) - _, err = h1.NewStream(network.WithUseTransient(context.Background(), "test"), h2.ID(), "/testprotocol") + _, err = h1.NewStream(network.WithAllowLimitedConn(context.Background(), "test"), h2.ID(), "/testprotocol") require.NoError(t, err) } diff --git a/p2p/test/swarm/swarm_test.go b/p2p/test/swarm/swarm_test.go index 8027cebe53..10298f5139 100644 --- a/p2p/test/swarm/swarm_test.go +++ b/p2p/test/swarm/swarm_test.go @@ -110,16 +110,16 @@ func TestNewStreamTransientConnection(t *testing.T) { h1.Peerstore().AddAddr(h2.ID(), relayaddr, peerstore.TempAddrTTL) - // WithUseTransient should succeed + // WithAllowLimitedConn should succeed ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - ctx = network.WithUseTransient(ctx, "test") + ctx = network.WithAllowLimitedConn(ctx, "test") s, err := h1.Network().NewStream(ctx, h2.ID()) require.NoError(t, err) require.NotNil(t, s) defer s.Close() - // Without WithUseTransient should fail with context deadline exceeded + // Without WithAllowLimitedConn should fail with context deadline exceeded ctx, cancel = context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() s, err = h1.Network().NewStream(ctx, h2.ID()) From d52e90067b70b28bf3c1895cee5a3e5c9c29f964 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 25 Apr 2024 20:39:01 +0530 Subject: [PATCH 10/18] improve connectedness check --- p2p/net/swarm/swarm.go | 45 +++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index f13875b5d4..dc955afd4d 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -390,11 +390,11 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } - oldState := s.connectednessUnlocked(p, true) + oldState := s.connectednessUnlocked(p) c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) - newState := s.connectednessUnlocked(p, true) + newState := s.connectednessUnlocked(p) // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose @@ -659,17 +659,38 @@ func (s *Swarm) Connectedness(p peer.ID) network.Connectedness { s.conns.RLock() defer s.conns.RUnlock() - return s.connectednessUnlocked(p, false) + return s.connectednessUnlocked(p) } -// connectednessUnlocked returns the connectedness of a peer. For sending peer connectedness -// changed notifications consider closed connections. When remote closes a connection -// the underlying transport connection is closed first, so tracking changes to the connectedness -// state requires considering this recently closed connections impact on Connectedness. -func (s *Swarm) connectednessUnlocked(p peer.ID, considerClosed bool) network.Connectedness { +// connectednessUnlocked returns the connectedness of a peer. +func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { var haveLimited bool for _, c := range s.conns.m[p] { - if !considerClosed && c.IsClosed() { + if c.IsClosed() { + // These will be garbage collected soon + continue + } + if c.Stat().Limited { + haveLimited = true + } else { + return network.Connected + } + } + if haveLimited { + return network.Limited + } else { + return network.NotConnected + } +} + +// connectednessWithClosedConnUnlocked returns connectedness to peer assuming that closedConn is still +// connected. +// This is useful when the peer closes the Connection and we are interested in knowing what the old +// connectedness before closing of the connection was. +func (s *Swarm) connectednessWithClosedConnUnlocked(p peer.ID, closedConn *Conn) network.Connectedness { + var haveLimited bool + for _, c := range s.conns.m[p] { + if c.IsClosed() && c != closedConn { // These will be garbage collected soon continue } @@ -782,7 +803,9 @@ func (s *Swarm) removeConn(c *Conn) { cs := s.conns.m[p] - oldState := s.connectednessUnlocked(p, true) + // To get an accurate oldState we need to consider the impact of the recently + // closed connection on connectedness + oldState := s.connectednessWithClosedConnUnlocked(p, c) if len(cs) == 1 { delete(s.conns.m, p) @@ -800,7 +823,7 @@ func (s *Swarm) removeConn(c *Conn) { } } - newState := s.connectednessUnlocked(p, true) + newState := s.connectednessUnlocked(p) s.conns.Unlock() From eb65e0f85799ffc4197045cbf5673ba114dab6ec Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 30 Apr 2024 21:53:05 +0530 Subject: [PATCH 11/18] store connectedness state on swarm --- p2p/net/swarm/swarm.go | 48 +++++++++++++----------------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index dc955afd4d..89744a2419 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -156,7 +156,8 @@ type Swarm struct { conns struct { sync.RWMutex - m map[peer.ID][]*Conn + m map[peer.ID][]*Conn + connectedness map[peer.ID]network.Connectedness } listeners struct { @@ -234,6 +235,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.conns.m = make(map[peer.ID][]*Conn) + s.conns.connectedness = make(map[peer.ID]network.Connectedness) s.listeners.m = make(map[transport.Listener]struct{}) s.transports.m = make(map[int]transport.Transport) s.notifs.m = make(map[network.Notifiee]struct{}) @@ -282,6 +284,7 @@ func (s *Swarm) close() { s.conns.Lock() conns := s.conns.m s.conns.m = nil + s.conns.connectedness = nil s.conns.Unlock() // Lots of goroutines but we might as well do this in parallel. We want to shut down as fast as @@ -390,11 +393,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } - oldState := s.connectednessUnlocked(p) + oldState := s.conns.connectedness[p] c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) newState := s.connectednessUnlocked(p) + s.conns.connectedness[p] = newState // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose @@ -678,33 +682,8 @@ func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness { } if haveLimited { return network.Limited - } else { - return network.NotConnected - } -} - -// connectednessWithClosedConnUnlocked returns connectedness to peer assuming that closedConn is still -// connected. -// This is useful when the peer closes the Connection and we are interested in knowing what the old -// connectedness before closing of the connection was. -func (s *Swarm) connectednessWithClosedConnUnlocked(p peer.ID, closedConn *Conn) network.Connectedness { - var haveLimited bool - for _, c := range s.conns.m[p] { - if c.IsClosed() && c != closedConn { - // These will be garbage collected soon - continue - } - if c.Stat().Limited { - haveLimited = true - } else { - return network.Connected - } - } - if haveLimited { - return network.Limited - } else { - return network.NotConnected } + return network.NotConnected } // Conns returns a slice of all connections. @@ -801,12 +780,8 @@ func (s *Swarm) removeConn(c *Conn) { s.conns.Lock() + oldState := s.conns.connectedness[p] cs := s.conns.m[p] - - // To get an accurate oldState we need to consider the impact of the recently - // closed connection on connectedness - oldState := s.connectednessWithClosedConnUnlocked(p, c) - if len(cs) == 1 { delete(s.conns.m, p) } else { @@ -824,6 +799,13 @@ func (s *Swarm) removeConn(c *Conn) { } newState := s.connectednessUnlocked(p) + if s.conns.connectedness != nil { // swarm is not closing + if newState == network.NotConnected { + delete(s.conns.connectedness, p) + } else { + s.conns.connectedness[p] = newState + } + } s.conns.Unlock() From 89a84426b59140fa615f64298a859ddc4d448452 Mon Sep 17 00:00:00 2001 From: sukun Date: Tue, 30 Apr 2024 22:42:50 +0530 Subject: [PATCH 12/18] check for limited connectivity event --- p2p/host/blank/blank.go | 7 +++--- p2p/protocol/circuitv2/relay/relay_test.go | 28 ++++++++++++++++++++-- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/p2p/host/blank/blank.go b/p2p/host/blank/blank.go index 0fdded30ff..0cf6642f69 100644 --- a/p2p/host/blank/blank.go +++ b/p2p/host/blank/blank.go @@ -63,9 +63,10 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost { } bh := &BlankHost{ - n: n, - cmgr: cfg.cmgr, - mux: mstream.NewMultistreamMuxer[protocol.ID](), + n: n, + cmgr: cfg.cmgr, + mux: mstream.NewMultistreamMuxer[protocol.ID](), + eventbus: cfg.eventBus, } if bh.eventbus == nil { bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer())) diff --git a/p2p/protocol/circuitv2/relay/relay_test.go b/p2p/protocol/circuitv2/relay/relay_test.go index db6d1146ce..e5d32b0c96 100644 --- a/p2p/protocol/circuitv2/relay/relay_test.go +++ b/p2p/protocol/circuitv2/relay/relay_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" @@ -23,6 +24,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" "github.com/libp2p/go-libp2p/p2p/transport/tcp" + "github.com/stretchr/testify/require" ma "github.com/multiformats/go-multiaddr" ) @@ -49,7 +51,8 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u } bwr := metrics.NewBandwidthCounter() - netw, err := swarm.NewSwarm(p, ps, eventbus.NewBus(), swarm.WithMetrics(bwr)) + bus := eventbus.NewBus() + netw, err := swarm.NewSwarm(p, ps, bus, swarm.WithMetrics(bwr)) if err != nil { t.Fatal(err) } @@ -70,7 +73,7 @@ func getNetHosts(t *testing.T, ctx context.Context, n int) (hosts []host.Host, u t.Fatal(err) } - h := bhost.NewBlankHost(netw) + h := bhost.NewBlankHost(netw, bhost.WithEventBus(bus)) hosts = append(hosts, h) } @@ -145,10 +148,31 @@ func TestBasicRelay(t *testing.T) { t.Fatal(err) } + sub, err := hosts[2].EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + require.NoError(t, err) + err = hosts[2].Connect(ctx, peer.AddrInfo{ID: hosts[0].ID(), Addrs: []ma.Multiaddr{raddr}}) if err != nil { t.Fatal(err) } + for { + var e interface{} + select { + case e = <-sub.Out(): + case <-time.After(2 * time.Second): + t.Fatal("expected limited connectivity event") + } + evt, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + t.Fatalf("invalid event: %s", e) + } + if evt.Peer == hosts[0].ID() { + if evt.Connectedness != network.Limited { + t.Fatalf("expected limited connectivity %s", evt.Connectedness) + } + break + } + } conns := hosts[2].Network().ConnsToPeer(hosts[0].ID()) if len(conns) != 1 { From ac195c5c7c1a934e0c1896e6391f737c645078c4 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 2 May 2024 16:08:27 +0530 Subject: [PATCH 13/18] close emitter after the events are sent --- p2p/net/swarm/swarm.go | 13 +++-- p2p/net/swarm/swarm_event_test.go | 88 +++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 5 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 89744a2419..bd05ebe7f4 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -273,8 +273,6 @@ func (s *Swarm) Done() <-chan struct{} { func (s *Swarm) close() { s.ctxCancel() - s.emitter.Close() - // Prevents new connections and/or listeners from being added to the swarm. s.listeners.Lock() listeners := s.listeners.m @@ -284,7 +282,6 @@ func (s *Swarm) close() { s.conns.Lock() conns := s.conns.m s.conns.m = nil - s.conns.connectedness = nil s.conns.Unlock() // Lots of goroutines but we might as well do this in parallel. We want to shut down as fast as @@ -311,6 +308,13 @@ func (s *Swarm) close() { // Wait for everything to finish. s.refs.Wait() + s.emitter.Close() + + // Remove the connectedness map only after we have closed the connection and sent all the disconnection + // events + s.conns.Lock() + s.conns.connectedness = nil + s.conns.Unlock() // Now close out any transports (if necessary). Do this after closing // all connections/listeners. @@ -799,14 +803,13 @@ func (s *Swarm) removeConn(c *Conn) { } newState := s.connectednessUnlocked(p) - if s.conns.connectedness != nil { // swarm is not closing + if s.conns.connectedness != nil { // This shoud always be non nil but a check doesn't hurt if newState == network.NotConnected { delete(s.conns.connectedness, p) } else { s.conns.connectedness[p] = newState } } - s.conns.Unlock() if oldState != newState { diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go index 86d698d611..8d2b2d79ce 100644 --- a/p2p/net/swarm/swarm_event_test.go +++ b/p2p/net/swarm/swarm_event_test.go @@ -113,3 +113,91 @@ func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) { // The test should finish without deadlocking } + +func TestConnectednessEvents(t *testing.T) { + s1, sub1 := newSwarmWithSubscription(t) + const N = 100 + peers := make([]*Swarm, N) + for i := 0; i < N; i++ { + peers[i] = swarmt.GenSwarm(t) + } + + // First check all connected events + done := make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < N; i++ { + e := <-sub1.Out() + evt, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + t.Error("invalid event received", e) + return + } + if evt.Connectedness != network.Connected { + t.Errorf("invalid event received: expected: Connected, got: %s", evt) + return + } + } + }() + for i := 0; i < N; i++ { + s1.Peerstore().AddAddrs(peers[i].LocalPeer(), []ma.Multiaddr{peers[i].ListenAddresses()[0]}, time.Hour) + _, err := s1.DialPeer(context.Background(), peers[i].LocalPeer()) + require.NoError(t, err) + } + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("expected all connectedness events to be completed") + } + + // Disconnect some peers + done = make(chan struct{}) + go func() { + defer close(done) + for i := 0; i < N/2; i++ { + e := <-sub1.Out() + evt, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + t.Error("invalid event received", e) + return + } + if evt.Connectedness != network.NotConnected { + t.Errorf("invalid event received: expected: NotConnected, got: %s", evt) + return + } + } + }() + for i := 0; i < N/2; i++ { + err := s1.ClosePeer(peers[i].LocalPeer()) + require.NoError(t, err) + } + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("expected all disconnected events to be completed") + } + + // Check for disconnected events on swarm close + done = make(chan struct{}) + go func() { + defer close(done) + for i := N / 2; i < N; i++ { + e := <-sub1.Out() + evt, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + t.Error("invalid event received", e) + return + } + if evt.Connectedness != network.NotConnected { + t.Errorf("invalid event received: expected: NotConnected, got: %s", evt) + return + } + } + }() + s1.Close() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("expected all disconnected events after swarm close to be completed") + } +} From d8e7c31abd789b9b0e36e306bfbab7b1057b94a4 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 2 May 2024 18:09:20 +0530 Subject: [PATCH 14/18] connectedness event deadlock test --- p2p/net/swarm/swarm.go | 2 ++ p2p/net/swarm/swarm_event_test.go | 45 +++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index bd05ebe7f4..0aef3f6fa2 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -813,10 +813,12 @@ func (s *Swarm) removeConn(c *Conn) { s.conns.Unlock() if oldState != newState { + fmt.Println("going to emit event", newState) s.emitter.Emit(event.EvtPeerConnectednessChanged{ Peer: p, Connectedness: newState, }) + fmt.Println("emitted event", newState) } } diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go index 8d2b2d79ce..fe9067fda9 100644 --- a/p2p/net/swarm/swarm_event_test.go +++ b/p2p/net/swarm/swarm_event_test.go @@ -2,6 +2,7 @@ package swarm_test import ( "context" + "fmt" "testing" "time" @@ -12,6 +13,7 @@ import ( swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -201,3 +203,46 @@ func TestConnectednessEvents(t *testing.T) { t.Fatal("expected all disconnected events after swarm close to be completed") } } + +func TestConnectednessEventDeadlock(t *testing.T) { + s1, sub1 := newSwarmWithSubscription(t) + const N = 100 + peers := make([]*Swarm, N) + for i := 0; i < N; i++ { + peers[i] = swarmt.GenSwarm(t) + } + + // First check all connected events + done := make(chan struct{}) + go func() { + defer close(done) + count := 0 + for count < N { + e := <-sub1.Out() + // sleep to simulate a slow consumer + evt, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + t.Error("invalid event received", e) + return + } + if evt.Connectedness != network.Connected { + continue + } + count++ + fmt.Println(count) + s1.ClosePeer(evt.Peer) + } + }() + for i := 0; i < N; i++ { + s1.Peerstore().AddAddrs(peers[i].LocalPeer(), []ma.Multiaddr{peers[i].ListenAddresses()[0]}, time.Hour) + go func(i int) { + _, err := s1.DialPeer(context.Background(), peers[i].LocalPeer()) + assert.NoError(t, err) + }(i) + } + select { + case <-done: + case <-time.After(100 * time.Second): + t.Fatal("expected all connectedness events to be completed") + } +} From 02b210d10ce95661edfe07d4d7a16a801567bbcb Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 2 May 2024 18:57:39 +0530 Subject: [PATCH 15/18] fix deadlock in sending connectedness events --- p2p/net/swarm/swarm.go | 125 ++++++++++++++++++++---------- p2p/net/swarm/swarm_event_test.go | 10 +-- 2 files changed, 87 insertions(+), 48 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0aef3f6fa2..d2d49adbe2 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -144,7 +144,9 @@ type Swarm struct { // down before continuing. refs sync.WaitGroup - emitter event.Emitter + emitter event.Emitter + connectednessEventCh chan struct{} + connectednessEmitterDone chan struct{} rcmgr network.ResourceManager @@ -156,8 +158,9 @@ type Swarm struct { conns struct { sync.RWMutex - m map[peer.ID][]*Conn - connectedness map[peer.ID]network.Connectedness + m map[peer.ID][]*Conn + connectednessEventQueue map[peer.ID][]network.Connectedness + lastConnectednessEvent map[peer.ID]network.Connectedness } listeners struct { @@ -217,15 +220,17 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ - local: local, - peers: peers, - emitter: emitter, - ctx: ctx, - ctxCancel: cancel, - dialTimeout: defaultDialTimeout, - dialTimeoutLocal: defaultDialTimeoutLocal, - maResolver: madns.DefaultResolver, - dialRanker: DefaultDialRanker, + local: local, + peers: peers, + emitter: emitter, + connectednessEventCh: make(chan struct{}, 1), + connectednessEmitterDone: make(chan struct{}), + ctx: ctx, + ctxCancel: cancel, + dialTimeout: defaultDialTimeout, + dialTimeoutLocal: defaultDialTimeoutLocal, + maResolver: madns.DefaultResolver, + dialRanker: DefaultDialRanker, // A black hole is a binary property. On a network if UDP dials are blocked or there is // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials @@ -235,7 +240,8 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.conns.m = make(map[peer.ID][]*Conn) - s.conns.connectedness = make(map[peer.ID]network.Connectedness) + s.conns.connectednessEventQueue = make(map[peer.ID][]network.Connectedness) + s.conns.lastConnectednessEvent = make(map[peer.ID]network.Connectedness) s.listeners.m = make(map[transport.Listener]struct{}) s.transports.m = make(map[int]transport.Transport) s.notifs.m = make(map[network.Notifiee]struct{}) @@ -256,7 +262,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts s.backf.init(s.ctx) s.bhd = newBlackHoleDetector(s.udpBlackHoleConfig, s.ipv6BlackHoleConfig, s.metricsTracer) - + go s.connectednessEventEmitter() return s, nil } @@ -308,12 +314,15 @@ func (s *Swarm) close() { // Wait for everything to finish. s.refs.Wait() + close(s.connectednessEventCh) + <-s.connectednessEmitterDone s.emitter.Close() // Remove the connectedness map only after we have closed the connection and sent all the disconnection // events s.conns.Lock() - s.conns.connectedness = nil + s.conns.connectednessEventQueue = nil + s.conns.lastConnectednessEvent = nil s.conns.Unlock() // Now close out any transports (if necessary). Do this after closing @@ -397,12 +406,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, return nil, ErrSwarmClosed } - oldState := s.conns.connectedness[p] - c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) - newState := s.connectednessUnlocked(p) - s.conns.connectedness[p] = newState + s.maybeEnqueueConnectednessUnlocked(p) // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose @@ -428,15 +434,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, s.directConnNotifs.Unlock() } - // Emit event after releasing `s.conns` lock so that a consumer can still - // use swarm methods that need the `s.conns` lock. - if oldState != newState { - s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: newState, - }) - } - s.notifyAll(func(f network.Notifiee) { f.Connected(s, c) }) @@ -783,8 +780,6 @@ func (s *Swarm) removeConn(c *Conn) { p := c.RemotePeer() s.conns.Lock() - - oldState := s.conns.connectedness[p] cs := s.conns.m[p] if len(cs) == 1 { delete(s.conns.m, p) @@ -801,24 +796,70 @@ func (s *Swarm) removeConn(c *Conn) { } } } + s.maybeEnqueueConnectednessUnlocked(p) + s.conns.Unlock() +} + +func (s *Swarm) lastConnectednessEventUnlocked(p peer.ID) network.Connectedness { + events := s.conns.connectednessEventQueue[p] + if len(events) > 0 { + return events[len(events)-1] + } + return s.conns.lastConnectednessEvent[p] +} +func (s *Swarm) maybeEnqueueConnectednessUnlocked(p peer.ID) { + oldState := s.lastConnectednessEventUnlocked(p) newState := s.connectednessUnlocked(p) - if s.conns.connectedness != nil { // This shoud always be non nil but a check doesn't hurt - if newState == network.NotConnected { - delete(s.conns.connectedness, p) + if oldState != newState { + if s.conns.connectednessEventQueue != nil { + s.conns.connectednessEventQueue[p] = append(s.conns.connectednessEventQueue[p], newState) + select { + case s.connectednessEventCh <- struct{}{}: + default: + } } else { - s.conns.connectedness[p] = newState + log.Errorf("SWARM BUG: nil connectedness map") } } - s.conns.Unlock() +} - if oldState != newState { - fmt.Println("going to emit event", newState) - s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: newState, - }) - fmt.Println("emitted event", newState) +func (s *Swarm) connectednessEventEmitter() { + defer close(s.connectednessEmitterDone) + for range s.connectednessEventCh { + for { + var c network.Connectedness + var peer peer.ID + s.conns.Lock() + for p, v := range s.conns.connectednessEventQueue { + if len(v) == 0 { + // this shouldn't happen + delete(s.conns.connectednessEventQueue, p) + log.Errorf("SWARM BUG: empty connectedness event slice %v %v", p, v) + continue + } + c = v[0] + peer = p + s.conns.connectednessEventQueue[p] = v[1:] + if len(s.conns.connectednessEventQueue[p]) == 0 { + delete(s.conns.connectednessEventQueue, p) + } + if c == network.NotConnected { + delete(s.conns.lastConnectednessEvent, p) + } else { + s.conns.lastConnectednessEvent[p] = c + } + break + } + s.conns.Unlock() + if peer == "" { + break + } + s.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: peer, + Connectedness: c, + }) + } } } diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go index fe9067fda9..12393c5b1d 100644 --- a/p2p/net/swarm/swarm_event_test.go +++ b/p2p/net/swarm/swarm_event_test.go @@ -2,7 +2,6 @@ package swarm_test import ( "context" - "fmt" "testing" "time" @@ -68,6 +67,10 @@ func TestConnectednessEventsSingleConn(t *testing.T) { } func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + dialerEventBus := eventbus.NewBus() dialer := swarmt.GenSwarm(t, swarmt.OptDialOnly, swarmt.EventBus(dialerEventBus)) defer dialer.Close() @@ -87,10 +90,6 @@ func TestNoDeadlockWhenConsumingConnectednessEvents(t *testing.T) { sub, err := dialerEventBus.Subscribe(new(event.EvtPeerConnectednessChanged)) require.NoError(t, err) - ctx := context.Background() - ctx, cancel := context.WithCancel(ctx) - defer cancel() - // A slow consumer go func() { for { @@ -229,7 +228,6 @@ func TestConnectednessEventDeadlock(t *testing.T) { continue } count++ - fmt.Println(count) s1.ClosePeer(evt.Peer) } }() From 4573ce64b16d6c0e2654a0c22f4b36c821bbd2ae Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 2 May 2024 19:56:00 +0530 Subject: [PATCH 16/18] simplify connectedness events --- p2p/net/swarm/swarm.go | 104 +++++++++++++---------------------------- 1 file changed, 33 insertions(+), 71 deletions(-) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index d2d49adbe2..a8f08da148 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -158,9 +158,8 @@ type Swarm struct { conns struct { sync.RWMutex - m map[peer.ID][]*Conn - connectednessEventQueue map[peer.ID][]network.Connectedness - lastConnectednessEvent map[peer.ID]network.Connectedness + m map[peer.ID][]*Conn + connectednessEvents chan peer.ID } listeners struct { @@ -240,8 +239,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.conns.m = make(map[peer.ID][]*Conn) - s.conns.connectednessEventQueue = make(map[peer.ID][]network.Connectedness) - s.conns.lastConnectednessEvent = make(map[peer.ID]network.Connectedness) + s.conns.connectednessEvents = make(chan peer.ID, 32) s.listeners.m = make(map[transport.Listener]struct{}) s.transports.m = make(map[int]transport.Transport) s.notifs.m = make(map[network.Notifiee]struct{}) @@ -314,17 +312,10 @@ func (s *Swarm) close() { // Wait for everything to finish. s.refs.Wait() - close(s.connectednessEventCh) + close(s.conns.connectednessEvents) <-s.connectednessEmitterDone s.emitter.Close() - // Remove the connectedness map only after we have closed the connection and sent all the disconnection - // events - s.conns.Lock() - s.conns.connectednessEventQueue = nil - s.conns.lastConnectednessEvent = nil - s.conns.Unlock() - // Now close out any transports (if necessary). Do this after closing // all connections/listeners. s.transports.Lock() @@ -408,8 +399,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.streams.m = make(map[*Stream]struct{}) s.conns.m[p] = append(s.conns.m[p], c) - s.maybeEnqueueConnectednessUnlocked(p) - // Add two swarm refs: // * One will be decremented after the close notifications fire in Conn.doClose // * The other will be decremented when Conn.start exits. @@ -420,6 +409,13 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, c.notifyLk.Lock() s.conns.Unlock() + // Block this goroutine till this request is enqueued. + // This ensures that there are only a finite number of goroutines that are waiting to send + // the connectedness event on the disconnection side in swarm.removeConn. + // This is so because the goroutine to enqueue disconnection event can only be started + // from either a subscriber or a notifier or after calling c.start + s.conns.connectednessEvents <- p + if !isLimited { // Notify goroutines waiting for a direct connection // @@ -796,68 +792,34 @@ func (s *Swarm) removeConn(c *Conn) { } } } - s.maybeEnqueueConnectednessUnlocked(p) s.conns.Unlock() -} - -func (s *Swarm) lastConnectednessEventUnlocked(p peer.ID) network.Connectedness { - events := s.conns.connectednessEventQueue[p] - if len(events) > 0 { - return events[len(events)-1] - } - return s.conns.lastConnectednessEvent[p] -} - -func (s *Swarm) maybeEnqueueConnectednessUnlocked(p peer.ID) { - oldState := s.lastConnectednessEventUnlocked(p) - newState := s.connectednessUnlocked(p) - if oldState != newState { - if s.conns.connectednessEventQueue != nil { - s.conns.connectednessEventQueue[p] = append(s.conns.connectednessEventQueue[p], newState) - select { - case s.connectednessEventCh <- struct{}{}: - default: - } - } else { - log.Errorf("SWARM BUG: nil connectedness map") - } - } + // Do this in a separate go routine to not block the caller. + // This ensures that if a event subscriber closes the connection from the subscription goroutine + // this doesn't deadlock + s.refs.Add(1) + go func() { + defer s.refs.Done() + s.conns.connectednessEvents <- p + }() } func (s *Swarm) connectednessEventEmitter() { defer close(s.connectednessEmitterDone) - for range s.connectednessEventCh { - for { - var c network.Connectedness - var peer peer.ID - s.conns.Lock() - for p, v := range s.conns.connectednessEventQueue { - if len(v) == 0 { - // this shouldn't happen - delete(s.conns.connectednessEventQueue, p) - log.Errorf("SWARM BUG: empty connectedness event slice %v %v", p, v) - continue - } - c = v[0] - peer = p - s.conns.connectednessEventQueue[p] = v[1:] - if len(s.conns.connectednessEventQueue[p]) == 0 { - delete(s.conns.connectednessEventQueue, p) - } - if c == network.NotConnected { - delete(s.conns.lastConnectednessEvent, p) - } else { - s.conns.lastConnectednessEvent[p] = c - } - break - } - s.conns.Unlock() - if peer == "" { - break - } + lastConnectednessEvents := make(map[peer.ID]network.Connectedness) + for p := range s.conns.connectednessEvents { + s.conns.Lock() + oldState := lastConnectednessEvents[p] + newState := s.connectednessUnlocked(p) + if newState != network.NotConnected { + lastConnectednessEvents[p] = newState + } else { + delete(lastConnectednessEvents, p) + } + s.conns.Unlock() + if newState != oldState { s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: peer, - Connectedness: c, + Peer: p, + Connectedness: newState, }) } } From 25ccbf2644fe41286d954fba6de4094eeb0208aa Mon Sep 17 00:00:00 2001 From: sukun Date: Fri, 3 May 2024 21:29:53 +0530 Subject: [PATCH 17/18] Always send 1 event for a connection --- p2p/net/swarm/connectedness_event_emitter.go | 143 +++++++++++++++++++ p2p/net/swarm/dial_worker.go | 2 +- p2p/net/swarm/swarm.go | 109 +++++--------- p2p/net/swarm/swarm_conn.go | 9 +- p2p/net/swarm/swarm_event_test.go | 66 +++++++++ p2p/net/swarm/swarm_listen.go | 3 +- 6 files changed, 253 insertions(+), 79 deletions(-) create mode 100644 p2p/net/swarm/connectedness_event_emitter.go diff --git a/p2p/net/swarm/connectedness_event_emitter.go b/p2p/net/swarm/connectedness_event_emitter.go new file mode 100644 index 0000000000..793134136b --- /dev/null +++ b/p2p/net/swarm/connectedness_event_emitter.go @@ -0,0 +1,143 @@ +package swarm + +import ( + "context" + "sync" + + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" +) + +// connectednessEventEmitter emits PeerConnectednessChanged events. +// We ensure that for any peer we connected to we always sent atleast 1 NotConnected Event after +// the peer disconnects. This is because peers can observe a connection before they are notified +// of the connection by a peer connectedness changed event. +type connectednessEventEmitter struct { + mx sync.RWMutex + // newConns is the channel that holds the peerIDs we recently connected to + newConns chan peer.ID + removeConnsMx sync.Mutex + // removeConns is a slice of peerIDs we have recently closed connections to + removeConns []peer.ID + // lastEvent is the last connectedness event sent for a particular peer. + lastEvent map[peer.ID]network.Connectedness + // connectedness is the function that gives the peers current connectedness state + connectedness func(peer.ID) network.Connectedness + // emitter is the PeerConnectednessChanged event emitter + emitter event.Emitter + wg sync.WaitGroup + removeConnNotif chan struct{} + ctx context.Context + cancel context.CancelFunc +} + +func newConnectednessEventEmitter(connectedness func(peer.ID) network.Connectedness, emitter event.Emitter) *connectednessEventEmitter { + ctx, cancel := context.WithCancel(context.Background()) + c := &connectednessEventEmitter{ + newConns: make(chan peer.ID, 32), + lastEvent: make(map[peer.ID]network.Connectedness), + removeConnNotif: make(chan struct{}, 1), + connectedness: connectedness, + emitter: emitter, + ctx: ctx, + cancel: cancel, + } + c.wg.Add(1) + go c.runEmitter() + return c +} + +func (c *connectednessEventEmitter) AddConn(p peer.ID) { + c.mx.RLock() + defer c.mx.RUnlock() + if c.ctx.Err() != nil { + return + } + + c.newConns <- p +} + +func (c *connectednessEventEmitter) RemoveConn(p peer.ID) { + c.mx.RLock() + defer c.mx.RUnlock() + if c.ctx.Err() != nil { + return + } + + c.removeConnsMx.Lock() + // This queue is not unbounded since we block in the AddConn method + // So we are adding connections to the swarm only at a rate + // the subscriber for our peer connectedness changed events can consume them. + // If a lot of open connections are closed at once, increasing the disconnected + // event notification rate, the rate of adding connections to the swarm would + // proportionately reduce, which would eventually reduce the length of this slice. + c.removeConns = append(c.removeConns, p) + c.removeConnsMx.Unlock() + + select { + case c.removeConnNotif <- struct{}{}: + default: + } +} + +func (c *connectednessEventEmitter) Close() { + c.cancel() + c.wg.Wait() +} + +func (c *connectednessEventEmitter) runEmitter() { + defer c.wg.Done() + for { + select { + case p := <-c.newConns: + c.notifyPeer(p, true) + case <-c.removeConnNotif: + c.sendConnRemovedNotifications() + case <-c.ctx.Done(): + c.mx.Lock() // Wait for all pending AddConn & RemoveConn operations to complete + defer c.mx.Unlock() + for { + select { + case p := <-c.newConns: + c.notifyPeer(p, true) + case <-c.removeConnNotif: + c.sendConnRemovedNotifications() + default: + return + } + } + } + } +} + +func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) { + oldState := c.lastEvent[p] + c.lastEvent[p] = c.connectedness(p) + if c.lastEvent[p] == network.NotConnected { + delete(c.lastEvent, p) + } + if (forceNotConnectedEvent && c.lastEvent[p] == network.NotConnected) || c.lastEvent[p] != oldState { + c.emitter.Emit(event.EvtPeerConnectednessChanged{ + Peer: p, + Connectedness: c.lastEvent[p], + }) + } +} + +func (c *connectednessEventEmitter) sendConnRemovedNotifications() { + c.removeConnsMx.Lock() + defer c.removeConnsMx.Unlock() + for { + if len(c.removeConns) == 0 { + return + } + p := c.removeConns[0] + c.removeConns[0] = "" + c.removeConns = c.removeConns[1:] + + c.removeConnsMx.Unlock() + c.notifyPeer(p, false) + c.removeConnsMx.Lock() + } +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 0cac6e4fa3..2ebc4e1efd 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -340,7 +340,7 @@ loop: ad.expectedTCPUpgradeTime = time.Time{} if res.Conn != nil { // we got a connection, add it to the swarm - conn, err := w.s.addConn(res.Conn, network.DirOutbound) + conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound) if err != nil { // oops no, we failed to add it to the swarm res.Conn.Close() diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index a8f08da148..8e0db0b6d8 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -144,9 +144,7 @@ type Swarm struct { // down before continuing. refs sync.WaitGroup - emitter event.Emitter - connectednessEventCh chan struct{} - connectednessEmitterDone chan struct{} + emitter event.Emitter rcmgr network.ResourceManager @@ -158,8 +156,7 @@ type Swarm struct { conns struct { sync.RWMutex - m map[peer.ID][]*Conn - connectednessEvents chan peer.ID + m map[peer.ID][]*Conn } listeners struct { @@ -206,9 +203,10 @@ type Swarm struct { dialRanker network.DialRanker - udpBlackHoleConfig blackHoleConfig - ipv6BlackHoleConfig blackHoleConfig - bhd *blackHoleDetector + udpBlackHoleConfig blackHoleConfig + ipv6BlackHoleConfig blackHoleConfig + bhd *blackHoleDetector + connectednessEventEmitter *connectednessEventEmitter } // NewSwarm constructs a Swarm. @@ -219,17 +217,15 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } ctx, cancel := context.WithCancel(context.Background()) s := &Swarm{ - local: local, - peers: peers, - emitter: emitter, - connectednessEventCh: make(chan struct{}, 1), - connectednessEmitterDone: make(chan struct{}), - ctx: ctx, - ctxCancel: cancel, - dialTimeout: defaultDialTimeout, - dialTimeoutLocal: defaultDialTimeoutLocal, - maResolver: madns.DefaultResolver, - dialRanker: DefaultDialRanker, + local: local, + peers: peers, + emitter: emitter, + ctx: ctx, + ctxCancel: cancel, + dialTimeout: defaultDialTimeout, + dialTimeoutLocal: defaultDialTimeoutLocal, + maResolver: madns.DefaultResolver, + dialRanker: DefaultDialRanker, // A black hole is a binary property. On a network if UDP dials are blocked or there is // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials @@ -239,11 +235,11 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts } s.conns.m = make(map[peer.ID][]*Conn) - s.conns.connectednessEvents = make(chan peer.ID, 32) s.listeners.m = make(map[transport.Listener]struct{}) s.transports.m = make(map[int]transport.Transport) s.notifs.m = make(map[network.Notifiee]struct{}) s.directConnNotifs.m = make(map[peer.ID][]chan struct{}) + s.connectednessEventEmitter = newConnectednessEventEmitter(s.Connectedness, emitter) for _, opt := range opts { if err := opt(s); err != nil { @@ -260,7 +256,6 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts s.backf.init(s.ctx) s.bhd = newBlackHoleDetector(s.udpBlackHoleConfig, s.ipv6BlackHoleConfig, s.metricsTracer) - go s.connectednessEventEmitter() return s, nil } @@ -312,8 +307,7 @@ func (s *Swarm) close() { // Wait for everything to finish. s.refs.Wait() - close(s.conns.connectednessEvents) - <-s.connectednessEmitterDone + s.connectednessEventEmitter.Close() s.emitter.Close() // Now close out any transports (if necessary). Do this after closing @@ -344,7 +338,7 @@ func (s *Swarm) close() { wg.Wait() } -func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) { +func (s *Swarm) addConn(ctx context.Context, tc transport.CapableConn, dir network.Direction) (*Conn, error) { var ( p = tc.RemotePeer() addr = tc.RemoteMultiaddr() @@ -403,18 +397,15 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, // * One will be decremented after the close notifications fire in Conn.doClose // * The other will be decremented when Conn.start exits. s.refs.Add(2) - // Take the notification lock before releasing the conns lock to block // Disconnect notifications until after the Connect notifications done. + // This lock also ensures that swarm.refs.Wait() exits after we have + // enqueued the peer connectedness changed notification. + // TODO: Fix this fragility by taking a swarm ref for dial worker loop c.notifyLk.Lock() s.conns.Unlock() - // Block this goroutine till this request is enqueued. - // This ensures that there are only a finite number of goroutines that are waiting to send - // the connectedness event on the disconnection side in swarm.removeConn. - // This is so because the goroutine to enqueue disconnection event can only be started - // from either a subscriber or a notifier or after calling c.start - s.conns.connectednessEvents <- p + s.connectednessEventEmitter.AddConn(p) if !isLimited { // Notify goroutines waiting for a direct connection @@ -429,7 +420,6 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, delete(s.directConnNotifs.m, p) s.directConnNotifs.Unlock() } - s.notifyAll(func(f network.Notifiee) { f.Connected(s, c) }) @@ -777,52 +767,21 @@ func (s *Swarm) removeConn(c *Conn) { s.conns.Lock() cs := s.conns.m[p] - if len(cs) == 1 { - delete(s.conns.m, p) - } else { - for i, ci := range cs { - if ci == c { - // NOTE: We're intentionally preserving order. - // This way, connections to a peer are always - // sorted oldest to newest. - copy(cs[i:], cs[i+1:]) - cs[len(cs)-1] = nil - s.conns.m[p] = cs[:len(cs)-1] - break - } + for i, ci := range cs { + if ci == c { + // NOTE: We're intentionally preserving order. + // This way, connections to a peer are always + // sorted oldest to newest. + copy(cs[i:], cs[i+1:]) + cs[len(cs)-1] = nil + s.conns.m[p] = cs[:len(cs)-1] + break } } - s.conns.Unlock() - // Do this in a separate go routine to not block the caller. - // This ensures that if a event subscriber closes the connection from the subscription goroutine - // this doesn't deadlock - s.refs.Add(1) - go func() { - defer s.refs.Done() - s.conns.connectednessEvents <- p - }() -} - -func (s *Swarm) connectednessEventEmitter() { - defer close(s.connectednessEmitterDone) - lastConnectednessEvents := make(map[peer.ID]network.Connectedness) - for p := range s.conns.connectednessEvents { - s.conns.Lock() - oldState := lastConnectednessEvents[p] - newState := s.connectednessUnlocked(p) - if newState != network.NotConnected { - lastConnectednessEvents[p] = newState - } else { - delete(lastConnectednessEvents, p) - } - s.conns.Unlock() - if newState != oldState { - s.emitter.Emit(event.EvtPeerConnectednessChanged{ - Peer: p, - Connectedness: newState, - }) - } + if len(s.conns.m[p]) == 0 { + delete(s.conns.m, p) } + s.conns.Unlock() } // String returns a string representation of Network. diff --git a/p2p/net/swarm/swarm_conn.go b/p2p/net/swarm/swarm_conn.go index 17ae1dffae..38e942cce8 100644 --- a/p2p/net/swarm/swarm_conn.go +++ b/p2p/net/swarm/swarm_conn.go @@ -73,6 +73,11 @@ func (c *Conn) doClose() { c.err = c.conn.Close() + // Send the connectedness event after closing the connection. + // This ensures that both remote connection close and local connection + // close events are sent after the underlying transport connection is closed. + c.swarm.connectednessEventEmitter.RemoveConn(c.RemotePeer()) + // This is just for cleaning up state. The connection has already been closed. // We *could* optimize this but it really isn't worth it. for s := range streams { @@ -85,10 +90,11 @@ func (c *Conn) doClose() { c.notifyLk.Lock() defer c.notifyLk.Unlock() + // Only notify for disconnection if we notified for connection c.swarm.notifyAll(func(f network.Notifiee) { f.Disconnected(c.swarm, c) }) - c.swarm.refs.Done() // taken in Swarm.addConn + c.swarm.refs.Done() }() } @@ -108,7 +114,6 @@ func (c *Conn) start() { go func() { defer c.swarm.refs.Done() defer c.Close() - for { ts, err := c.conn.AcceptStream() if err != nil { diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go index 12393c5b1d..0a06a98fe4 100644 --- a/p2p/net/swarm/swarm_event_test.go +++ b/p2p/net/swarm/swarm_event_test.go @@ -2,6 +2,8 @@ package swarm_test import ( "context" + "fmt" + "sync" "testing" "time" @@ -244,3 +246,67 @@ func TestConnectednessEventDeadlock(t *testing.T) { t.Fatal("expected all connectedness events to be completed") } } + +func TestConnectednessEventDeadlockWithDial(t *testing.T) { + s1, sub1 := newSwarmWithSubscription(t) + const N = 200 + peers := make([]*Swarm, N) + for i := 0; i < N; i++ { + peers[i] = swarmt.GenSwarm(t) + } + peers2 := make([]*Swarm, N) + for i := 0; i < N; i++ { + peers2[i] = swarmt.GenSwarm(t) + } + + // First check all connected events + done := make(chan struct{}) + var subWG sync.WaitGroup + subWG.Add(1) + go func() { + defer subWG.Done() + count := 0 + for { + var e interface{} + select { + case e = <-sub1.Out(): + case <-done: + return + } + // sleep to simulate a slow consumer + evt, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + t.Error("invalid event received", e) + return + } + if evt.Connectedness != network.Connected { + continue + } + if count < N { + time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) + s1.Peerstore().AddAddrs(peers2[count].LocalPeer(), []ma.Multiaddr{peers2[count].ListenAddresses()[0]}, time.Hour) + s1.DialPeer(ctx, peers2[count].LocalPeer()) + count++ + cancel() + } + } + }() + var wg sync.WaitGroup + wg.Add(N) + for i := 0; i < N; i++ { + s1.Peerstore().AddAddrs(peers[i].LocalPeer(), []ma.Multiaddr{peers[i].ListenAddresses()[0]}, time.Hour) + go func(i int) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + s1.DialPeer(ctx, peers[i].LocalPeer()) + cancel() + wg.Done() + }(i) + } + wg.Wait() + s1.Close() + + close(done) + subWG.Wait() + fmt.Println("swarm closed") +} diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 0905e84513..2376a7e379 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -1,6 +1,7 @@ package swarm import ( + "context" "errors" "fmt" "time" @@ -142,7 +143,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { s.refs.Add(1) go func() { defer s.refs.Done() - _, err := s.addConn(c, network.DirInbound) + _, err := s.addConn(context.Background(), c, network.DirInbound) switch err { case nil: case ErrSwarmClosed: From b542b1529884e27e8b6b3a2418f2bcda64ab9b27 Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 8 May 2024 00:15:49 +0530 Subject: [PATCH 18/18] review comments --- p2p/net/swarm/connectedness_event_emitter.go | 34 ++++++++++---------- p2p/net/swarm/dial_worker.go | 2 +- p2p/net/swarm/swarm.go | 2 +- p2p/net/swarm/swarm_event_test.go | 2 -- p2p/net/swarm/swarm_listen.go | 3 +- 5 files changed, 20 insertions(+), 23 deletions(-) diff --git a/p2p/net/swarm/connectedness_event_emitter.go b/p2p/net/swarm/connectedness_event_emitter.go index 793134136b..07db583fc9 100644 --- a/p2p/net/swarm/connectedness_event_emitter.go +++ b/p2p/net/swarm/connectedness_event_emitter.go @@ -66,13 +66,14 @@ func (c *connectednessEventEmitter) RemoveConn(p peer.ID) { } c.removeConnsMx.Lock() - // This queue is not unbounded since we block in the AddConn method - // So we are adding connections to the swarm only at a rate - // the subscriber for our peer connectedness changed events can consume them. - // If a lot of open connections are closed at once, increasing the disconnected - // event notification rate, the rate of adding connections to the swarm would - // proportionately reduce, which would eventually reduce the length of this slice. + // This queue is roughly bounded by the total number of added connections we + // have. If consumers of connectedness events are slow, we apply + // backpressure to AddConn operations. + // + // We purposefully don't block/backpressure here to avoid deadlocks, since it's + // reasonable for a consumer of the event to want to remove a connection. c.removeConns = append(c.removeConns, p) + c.removeConnsMx.Unlock() select { @@ -111,6 +112,12 @@ func (c *connectednessEventEmitter) runEmitter() { } } +// notifyPeer sends the peer connectedness event using the emitter. +// Use forceNotConnectedEvent = true to send a NotConnected event even if +// no Connected event was sent for this peer. +// In case a peer is disconnected before we sent the Connected event, we still +// send the Disconnected event because a connection to the peer can be observed +// in such cases. func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) { oldState := c.lastEvent[p] c.lastEvent[p] = c.connectedness(p) @@ -127,17 +134,10 @@ func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent func (c *connectednessEventEmitter) sendConnRemovedNotifications() { c.removeConnsMx.Lock() - defer c.removeConnsMx.Unlock() - for { - if len(c.removeConns) == 0 { - return - } - p := c.removeConns[0] - c.removeConns[0] = "" - c.removeConns = c.removeConns[1:] - - c.removeConnsMx.Unlock() + removeConns := c.removeConns + c.removeConns = nil + c.removeConnsMx.Unlock() + for _, p := range removeConns { c.notifyPeer(p, false) - c.removeConnsMx.Lock() } } diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 2ebc4e1efd..0cac6e4fa3 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -340,7 +340,7 @@ loop: ad.expectedTCPUpgradeTime = time.Time{} if res.Conn != nil { // we got a connection, add it to the swarm - conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound) + conn, err := w.s.addConn(res.Conn, network.DirOutbound) if err != nil { // oops no, we failed to add it to the swarm res.Conn.Close() diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 8e0db0b6d8..7897277cc7 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -338,7 +338,7 @@ func (s *Swarm) close() { wg.Wait() } -func (s *Swarm) addConn(ctx context.Context, tc transport.CapableConn, dir network.Direction) (*Conn, error) { +func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) { var ( p = tc.RemotePeer() addr = tc.RemoteMultiaddr() diff --git a/p2p/net/swarm/swarm_event_test.go b/p2p/net/swarm/swarm_event_test.go index 0a06a98fe4..5010215fc2 100644 --- a/p2p/net/swarm/swarm_event_test.go +++ b/p2p/net/swarm/swarm_event_test.go @@ -2,7 +2,6 @@ package swarm_test import ( "context" - "fmt" "sync" "testing" "time" @@ -308,5 +307,4 @@ func TestConnectednessEventDeadlockWithDial(t *testing.T) { close(done) subWG.Wait() - fmt.Println("swarm closed") } diff --git a/p2p/net/swarm/swarm_listen.go b/p2p/net/swarm/swarm_listen.go index 2376a7e379..0905e84513 100644 --- a/p2p/net/swarm/swarm_listen.go +++ b/p2p/net/swarm/swarm_listen.go @@ -1,7 +1,6 @@ package swarm import ( - "context" "errors" "fmt" "time" @@ -143,7 +142,7 @@ func (s *Swarm) AddListenAddr(a ma.Multiaddr) error { s.refs.Add(1) go func() { defer s.refs.Done() - _, err := s.addConn(context.Background(), c, network.DirInbound) + _, err := s.addConn(c, network.DirInbound) switch err { case nil: case ErrSwarmClosed: