From 1219f9ea13b503721ee15661e1df987f947bd04d Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 10 Jun 2024 19:50:17 +0530 Subject: [PATCH] review comments 2 --- config/config.go | 40 +++++++-------- defaults.go | 8 +-- options.go | 16 +++--- p2p/net/swarm/black_hole_detector.go | 30 ++++++------ p2p/net/swarm/black_hole_detector_test.go | 28 +++++------ p2p/net/swarm/swarm.go | 16 +++--- p2p/net/swarm/swarm_dial_test.go | 2 +- p2p/net/swarm/swarm_metrics.go | 22 ++++----- p2p/net/swarm/swarm_metrics_test.go | 4 +- p2p/protocol/autonatv2/autonat_test.go | 4 +- p2p/protocol/autonatv2/client.go | 8 +-- p2p/protocol/autonatv2/server.go | 20 +++++++- p2p/protocol/autonatv2/server_test.go | 59 +++++++++++++++-------- 13 files changed, 145 insertions(+), 112 deletions(-) diff --git a/config/config.go b/config/config.go index 93003370b7..a40461aed8 100644 --- a/config/config.go +++ b/config/config.go @@ -135,10 +135,10 @@ type Config struct { DisableAutoNATv2 bool - UDPBlackHoleFilter *swarm.BlackHoleFilter - CustomUDPBlackHoleFilter bool - IPv6BlackHoleFilter *swarm.BlackHoleFilter - CustomIPv6BlackHoleFilter bool + UDPBlackHoleSuccessCounter *swarm.BlackHoleSuccessCounter + CustomUDPBlackHoleSuccessCounter bool + IPv6BlackHoleSuccessCounter *swarm.BlackHoleSuccessCounter + CustomIPv6BlackHoleSuccessCounter bool } func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) { @@ -174,8 +174,8 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa } opts := append(cfg.SwarmOpts, - swarm.WithUDPBlackHoleFilter(cfg.UDPBlackHoleFilter), - swarm.WithIPv6BlackHoleFilter(cfg.IPv6BlackHoleFilter), + swarm.WithUDPBlackHoleSuccessCounter(cfg.UDPBlackHoleSuccessCounter), + swarm.WithIPv6BlackHoleSuccessCounter(cfg.IPv6BlackHoleSuccessCounter), ) if cfg.Reporter != nil { opts = append(opts, swarm.WithMetrics(cfg.Reporter)) @@ -215,18 +215,18 @@ func (cfg *Config) makeAutoNATV2Host() (host.Host, error) { } autoNatCfg := Config{ - Transports: cfg.Transports, - Muxers: cfg.Muxers, - SecurityTransports: cfg.SecurityTransports, - Insecure: cfg.Insecure, - PSK: cfg.PSK, - ConnectionGater: cfg.ConnectionGater, - Reporter: cfg.Reporter, - PeerKey: autonatPrivKey, - Peerstore: ps, - DialRanker: swarm.NoDelayDialRanker, - UDPBlackHoleFilter: cfg.UDPBlackHoleFilter, - IPv6BlackHoleFilter: cfg.IPv6BlackHoleFilter, + Transports: cfg.Transports, + Muxers: cfg.Muxers, + SecurityTransports: cfg.SecurityTransports, + Insecure: cfg.Insecure, + PSK: cfg.PSK, + ConnectionGater: cfg.ConnectionGater, + Reporter: cfg.Reporter, + PeerKey: autonatPrivKey, + Peerstore: ps, + DialRanker: swarm.NoDelayDialRanker, + UDPBlackHoleSuccessCounter: cfg.UDPBlackHoleSuccessCounter, + IPv6BlackHoleSuccessCounter: cfg.IPv6BlackHoleSuccessCounter, SwarmOpts: []swarm.Option{ // Don't update black hole state for failed autonat dials swarm.WithReadOnlyBlackHoleDetector(), @@ -572,8 +572,8 @@ func (cfg *Config) addAutoNAT(h *bhost.BasicHost) error { Peerstore: ps, DialRanker: swarm.NoDelayDialRanker, SwarmOpts: []swarm.Option{ - swarm.WithUDPBlackHoleFilter(nil), - swarm.WithIPv6BlackHoleFilter(nil), + swarm.WithUDPBlackHoleSuccessCounter(nil), + swarm.WithIPv6BlackHoleSuccessCounter(nil), }, } diff --git a/defaults.go b/defaults.go index 8067ac00b0..1aba3bd7e2 100644 --- a/defaults.go +++ b/defaults.go @@ -137,13 +137,13 @@ var DefaultPrometheusRegisterer = func(cfg *Config) error { var defaultUDPBlackHoleDetector = func(cfg *Config) error { // A black hole is a binary property. On a network if UDP dials are blocked, all dials will // fail. So a low success rate of 5 out 100 dials is good enough. - return cfg.Apply(UDPBlackHoleFilter(&swarm.BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "UDP"})) + return cfg.Apply(UDPBlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "UDP"})) } var defaultIPv6BlackHoleDetector = func(cfg *Config) error { // A black hole is a binary property. On a network if there is no IPv6 connectivity, all // dials will fail. So a low success rate of 5 out 100 dials is good enough. - return cfg.Apply(IPv6BlackHoleFilter(&swarm.BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "IPv6"})) + return cfg.Apply(IPv6BlackHoleSuccessCounter(&swarm.BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "IPv6"})) } // Complete list of default options and when to fallback on them. @@ -204,13 +204,13 @@ var defaults = []struct { }, { fallback: func(cfg *Config) bool { - return !cfg.CustomUDPBlackHoleFilter && cfg.UDPBlackHoleFilter == nil + return !cfg.CustomUDPBlackHoleSuccessCounter && cfg.UDPBlackHoleSuccessCounter == nil }, opt: defaultUDPBlackHoleDetector, }, { fallback: func(cfg *Config) bool { - return !cfg.CustomIPv6BlackHoleFilter && cfg.IPv6BlackHoleFilter == nil + return !cfg.CustomIPv6BlackHoleSuccessCounter && cfg.IPv6BlackHoleSuccessCounter == nil }, opt: defaultIPv6BlackHoleDetector, }, diff --git a/options.go b/options.go index bc09b9fb0a..de83c88450 100644 --- a/options.go +++ b/options.go @@ -618,20 +618,20 @@ func DisableAutoNATv2() Option { } } -// UDPBlackHoleFilter configures libp2p to use f as the black hole filter for UDP addrs -func UDPBlackHoleFilter(f *swarm.BlackHoleFilter) Option { +// UDPBlackHoleSuccessCounter configures libp2p to use f as the black hole filter for UDP addrs +func UDPBlackHoleSuccessCounter(f *swarm.BlackHoleSuccessCounter) Option { return func(cfg *Config) error { - cfg.UDPBlackHoleFilter = f - cfg.CustomUDPBlackHoleFilter = true + cfg.UDPBlackHoleSuccessCounter = f + cfg.CustomUDPBlackHoleSuccessCounter = true return nil } } -// IPv6BlackHoleFilter configures libp2p to use f as the black hole filter for IPv6 addrs -func IPv6BlackHoleFilter(f *swarm.BlackHoleFilter) Option { +// IPv6BlackHoleSuccessCounter configures libp2p to use f as the black hole filter for IPv6 addrs +func IPv6BlackHoleSuccessCounter(f *swarm.BlackHoleSuccessCounter) Option { return func(cfg *Config) error { - cfg.IPv6BlackHoleFilter = f - cfg.CustomIPv6BlackHoleFilter = true + cfg.IPv6BlackHoleSuccessCounter = f + cfg.CustomIPv6BlackHoleSuccessCounter = true return nil } } diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go index 0aec8e6696..54782c1c01 100644 --- a/p2p/net/swarm/black_hole_detector.go +++ b/p2p/net/swarm/black_hole_detector.go @@ -29,14 +29,14 @@ func (st blackHoleState) String() string { } } -// BlackHoleFilter provides black hole filtering for dials. This filter should be used in concert +// BlackHoleSuccessCounter provides black hole filtering for dials. This filter should be used in concert // with a UDP or IPv6 address filter to detect UDP or IPv6 black hole. In a black holed environment, // dial requests are refused Requests are blocked if the number of successes in the last N dials is // less than MinSuccesses. // If a request succeeds in Blocked state, the filter state is reset and N subsequent requests are // allowed before reevaluating black hole state. Dials cancelled when some other concurrent dial // succeeded are counted as failures. A sufficiently large N prevents false negatives in such cases. -type BlackHoleFilter struct { +type BlackHoleSuccessCounter struct { // N is // 1. The minimum number of completed dials required before evaluating black hole state // 2. the minimum number of requests after which we probe the state of the black hole in @@ -63,7 +63,7 @@ type BlackHoleFilter struct { // RecordResult records the outcome of a dial. A successful dial in Blocked state will change the // state of the filter to Probing. A failed dial only blocks subsequent requests if the success // fraction over the last n outcomes is less than the minSuccessFraction of the filter. -func (b *BlackHoleFilter) RecordResult(success bool) { +func (b *BlackHoleSuccessCounter) RecordResult(success bool) { b.mu.Lock() defer b.mu.Unlock() @@ -91,7 +91,7 @@ func (b *BlackHoleFilter) RecordResult(success bool) { } // HandleRequest returns the result of applying the black hole filter for the request. -func (b *BlackHoleFilter) HandleRequest() blackHoleState { +func (b *BlackHoleSuccessCounter) HandleRequest() blackHoleState { b.mu.Lock() defer b.mu.Unlock() @@ -106,14 +106,14 @@ func (b *BlackHoleFilter) HandleRequest() blackHoleState { } } -func (b *BlackHoleFilter) reset() { +func (b *BlackHoleSuccessCounter) reset() { b.successes = 0 b.dialResults = b.dialResults[:0] b.requests = 0 b.updateState() } -func (b *BlackHoleFilter) updateState() { +func (b *BlackHoleSuccessCounter) updateState() { st := b.state if len(b.dialResults) < b.N { @@ -129,7 +129,7 @@ func (b *BlackHoleFilter) updateState() { } } -func (b *BlackHoleFilter) State() blackHoleState { +func (b *BlackHoleSuccessCounter) State() blackHoleState { b.mu.Lock() defer b.mu.Unlock() @@ -143,7 +143,7 @@ type blackHoleInfo struct { successFraction float64 } -func (b *BlackHoleFilter) info() blackHoleInfo { +func (b *BlackHoleSuccessCounter) info() blackHoleInfo { b.mu.Lock() defer b.mu.Unlock() @@ -165,8 +165,8 @@ func (b *BlackHoleFilter) info() blackHoleInfo { } } -// blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter` for each. -// For details of the black hole detection logic see `blackHoleFilter`. +// blackHoleDetector provides UDP and IPv6 black hole detection using a `BlackHoleSuccessCounter` for each. +// For details of the black hole detection logic see `BlackHoleSuccessCounter`. // In Read Only mode, detector doesn't update the state of underlying filters and refuses requests // when black hole state is unknown. This is useful for Swarms made specifically for services like // AutoNAT where we care about accurately reporting the reachability of a peer. @@ -175,7 +175,7 @@ func (b *BlackHoleFilter) info() blackHoleInfo { // of the black hole state are actually dialed and are not skipped because of dial prioritisation // logic. type blackHoleDetector struct { - udp, ipv6 *BlackHoleFilter + udp, ipv6 *BlackHoleSuccessCounter mt MetricsTracer readOnly bool } @@ -236,7 +236,7 @@ func (d *blackHoleDetector) FilterAddrs(addrs []ma.Multiaddr) (valid []ma.Multia ), blackHoled } -// RecordResult updates the state of the relevant blackHoleFilters for addr +// RecordResult updates the state of the relevant BlackHoleSuccessCounters for addr func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) { if d.readOnly || !manet.IsPublicAddr(addr) { return @@ -251,7 +251,7 @@ func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) { } } -func (d *blackHoleDetector) getFilterState(f *BlackHoleFilter) blackHoleState { +func (d *blackHoleDetector) getFilterState(f *BlackHoleSuccessCounter) blackHoleState { if d.readOnly { if f.State() != blackHoleStateAllowed { return blackHoleStateBlocked @@ -261,11 +261,11 @@ func (d *blackHoleDetector) getFilterState(f *BlackHoleFilter) blackHoleState { return f.HandleRequest() } -func (d *blackHoleDetector) trackMetrics(f *BlackHoleFilter) { +func (d *blackHoleDetector) trackMetrics(f *BlackHoleSuccessCounter) { if d.readOnly || d.mt == nil { return } // Track metrics only in non readOnly state info := f.info() - d.mt.UpdatedBlackHoleFilterState(info.name, info.state, info.nextProbeAfter, info.successFraction) + d.mt.UpdatedBlackHoleSuccessCounter(info.name, info.state, info.nextProbeAfter, info.successFraction) } diff --git a/p2p/net/swarm/black_hole_detector_test.go b/p2p/net/swarm/black_hole_detector_test.go index a07cc3c0d4..a38b43f4ce 100644 --- a/p2p/net/swarm/black_hole_detector_test.go +++ b/p2p/net/swarm/black_hole_detector_test.go @@ -8,9 +8,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestBlackHoleFilterReset(t *testing.T) { +func TestBlackHoleSuccessCounterReset(t *testing.T) { n := 10 - bhf := &BlackHoleFilter{N: n, MinSuccesses: 2, Name: "test"} + bhf := &BlackHoleSuccessCounter{N: n, MinSuccesses: 2, Name: "test"} var i = 0 // calls up to n should be probing for i = 1; i <= n; i++ { @@ -55,7 +55,7 @@ func TestBlackHoleFilterReset(t *testing.T) { } } -func TestBlackHoleFilterSuccessFraction(t *testing.T) { +func TestBlackHoleSuccessCounterSuccessFraction(t *testing.T) { n := 10 tests := []struct { minSuccesses, successes int @@ -71,7 +71,7 @@ func TestBlackHoleFilterSuccessFraction(t *testing.T) { } for i, tc := range tests { t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { - bhf := BlackHoleFilter{N: n, MinSuccesses: tc.minSuccesses} + bhf := BlackHoleSuccessCounter{N: n, MinSuccesses: tc.minSuccesses} for i := 0; i < tc.successes; i++ { bhf.RecordResult(true) } @@ -87,8 +87,8 @@ func TestBlackHoleFilterSuccessFraction(t *testing.T) { } func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { - udpF := &BlackHoleFilter{N: 10, MinSuccesses: 5} - ipv6F := &BlackHoleFilter{N: 10, MinSuccesses: 5} + udpF := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5} + ipv6F := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5} bhd := &blackHoleDetector{udp: udpF, ipv6: ipv6F} addrs := []ma.Multiaddr{ ma.StringCast("/ip4/1.2.3.4/tcp/1234"), @@ -106,7 +106,7 @@ func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { } func TestBlackHoleDetectorUDPDisabled(t *testing.T) { - ipv6F := &BlackHoleFilter{N: 10, MinSuccesses: 5} + ipv6F := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5} bhd := &blackHoleDetector{ipv6: ipv6F} publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") privAddr := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") @@ -122,7 +122,7 @@ func TestBlackHoleDetectorUDPDisabled(t *testing.T) { } func TestBlackHoleDetectorIPv6Disabled(t *testing.T) { - udpF := &BlackHoleFilter{N: 10, MinSuccesses: 5} + udpF := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5} bhd := &blackHoleDetector{udp: udpF} publicAddr := ma.StringCast("/ip6/2001::1/tcp/1234") privAddr := ma.StringCast("/ip6/::1/tcp/1234") @@ -140,8 +140,8 @@ func TestBlackHoleDetectorIPv6Disabled(t *testing.T) { func TestBlackHoleDetectorProbes(t *testing.T) { bhd := &blackHoleDetector{ - udp: &BlackHoleFilter{N: 2, MinSuccesses: 1, Name: "udp"}, - ipv6: &BlackHoleFilter{N: 3, MinSuccesses: 1, Name: "ipv6"}, + udp: &BlackHoleSuccessCounter{N: 2, MinSuccesses: 1, Name: "udp"}, + ipv6: &BlackHoleSuccessCounter{N: 3, MinSuccesses: 1, Name: "ipv6"}, } udp6Addr := ma.StringCast("/ip6/2001::1/udp/1234/quic-v1") addrs := []ma.Multiaddr{udp6Addr} @@ -175,8 +175,8 @@ func TestBlackHoleDetectorAddrFiltering(t *testing.T) { makeBHD := func(udpBlocked, ipv6Blocked bool) *blackHoleDetector { bhd := &blackHoleDetector{ - udp: &BlackHoleFilter{N: 100, MinSuccesses: 10, Name: "udp"}, - ipv6: &BlackHoleFilter{N: 100, MinSuccesses: 10, Name: "ipv6"}, + udp: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 10, Name: "udp"}, + ipv6: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 10, Name: "ipv6"}, } for i := 0; i < 100; i++ { bhd.RecordResult(udp4Pub, !udpBlocked) @@ -213,8 +213,8 @@ func TestBlackHoleDetectorAddrFiltering(t *testing.T) { } func TestBlackHoleDetectorReadOnlyMode(t *testing.T) { - udpF := &BlackHoleFilter{N: 10, MinSuccesses: 5} - ipv6F := &BlackHoleFilter{N: 10, MinSuccesses: 5} + udpF := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5} + ipv6F := &BlackHoleSuccessCounter{N: 10, MinSuccesses: 5} bhd := &blackHoleDetector{udp: udpF, ipv6: ipv6F, readOnly: true} publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") privAddr := ma.StringCast("/ip6/::1/tcp/1234") diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 81897be902..02cff1e881 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -112,20 +112,20 @@ func WithDialRanker(d network.DialRanker) Option { } } -// WithUDPBlackHoleFilter configures swarm to use the provided config for UDP black hole detection +// WithUDPBlackHoleSuccessCounter configures swarm to use the provided config for UDP black hole detection // n is the size of the sliding window used to evaluate black hole state // min is the minimum number of successes out of n required to not block requests -func WithUDPBlackHoleFilter(f *BlackHoleFilter) Option { +func WithUDPBlackHoleSuccessCounter(f *BlackHoleSuccessCounter) Option { return func(s *Swarm) error { s.udpBHF = f return nil } } -// WithIPv6BlackHoleFilter configures swarm to use the provided config for IPv6 black hole detection +// WithIPv6BlackHoleSuccessCounter configures swarm to use the provided config for IPv6 black hole detection // n is the size of the sliding window used to evaluate black hole state // min is the minimum number of successes out of n required to not block requests -func WithIPv6BlackHoleFilter(f *BlackHoleFilter) Option { +func WithIPv6BlackHoleSuccessCounter(f *BlackHoleSuccessCounter) Option { return func(s *Swarm) error { s.ipv6BHF = f return nil @@ -215,8 +215,8 @@ type Swarm struct { dialRanker network.DialRanker connectednessEventEmitter *connectednessEventEmitter - udpBHF *BlackHoleFilter - ipv6BHF *BlackHoleFilter + udpBHF *BlackHoleSuccessCounter + ipv6BHF *BlackHoleSuccessCounter bhd *blackHoleDetector readOnlyBHD bool } @@ -242,8 +242,8 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts // A black hole is a binary property. On a network if UDP dials are blocked or there is // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials // is good enough. - udpBHF: &BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "UDP"}, - ipv6BHF: &BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "IPv6"}, + udpBHF: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "UDP"}, + ipv6BHF: &BlackHoleSuccessCounter{N: 100, MinSuccesses: 5, Name: "IPv6"}, } s.conns.m = make(map[peer.ID][]*Conn) diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index ea8361f48c..f4c33170a9 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -364,7 +364,7 @@ func TestBlackHoledAddrBlocked(t *testing.T) { defer s.Close() n := 3 - s.bhd.ipv6 = &BlackHoleFilter{N: n, MinSuccesses: 1, Name: "IPv6"} + s.bhd.ipv6 = &BlackHoleSuccessCounter{N: n, MinSuccesses: 1, Name: "IPv6"} // All dials to this addr will fail. // manet.IsPublic is aggressive for IPv6 addresses. Use a NAT64 address. diff --git a/p2p/net/swarm/swarm_metrics.go b/p2p/net/swarm/swarm_metrics.go index b5c0f2e499..929f3f4946 100644 --- a/p2p/net/swarm/swarm_metrics.go +++ b/p2p/net/swarm/swarm_metrics.go @@ -85,7 +85,7 @@ var ( Buckets: []float64{0.001, 0.01, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 0.75, 1, 2}, }, ) - blackHoleFilterState = prometheus.NewGaugeVec( + blackHoleSuccessCounterState = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: metricNamespace, Name: "black_hole_filter_state", @@ -93,7 +93,7 @@ var ( }, []string{"name"}, ) - blackHoleFilterSuccessFraction = prometheus.NewGaugeVec( + blackHoleSuccessCounterSuccessFraction = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: metricNamespace, Name: "black_hole_filter_success_fraction", @@ -101,7 +101,7 @@ var ( }, []string{"name"}, ) - blackHoleFilterNextRequestAllowedAfter = prometheus.NewGaugeVec( + blackHoleSuccessCounterNextRequestAllowedAfter = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: metricNamespace, Name: "black_hole_filter_next_request_allowed_after", @@ -118,9 +118,9 @@ var ( connHandshakeLatency, dialsPerPeer, dialRankingDelay, - blackHoleFilterSuccessFraction, - blackHoleFilterState, - blackHoleFilterNextRequestAllowedAfter, + blackHoleSuccessCounterSuccessFraction, + blackHoleSuccessCounterState, + blackHoleSuccessCounterNextRequestAllowedAfter, } ) @@ -131,7 +131,7 @@ type MetricsTracer interface { FailedDialing(ma.Multiaddr, error, error) DialCompleted(success bool, totalDials int) DialRankingDelay(d time.Duration) - UpdatedBlackHoleFilterState(name string, state blackHoleState, nextProbeAfter int, successFraction float64) + UpdatedBlackHoleSuccessCounter(name string, state blackHoleState, nextProbeAfter int, successFraction float64) } type metricsTracer struct{} @@ -274,14 +274,14 @@ func (m *metricsTracer) DialRankingDelay(d time.Duration) { dialRankingDelay.Observe(d.Seconds()) } -func (m *metricsTracer) UpdatedBlackHoleFilterState(name string, state blackHoleState, +func (m *metricsTracer) UpdatedBlackHoleSuccessCounter(name string, state blackHoleState, nextProbeAfter int, successFraction float64) { tags := metricshelper.GetStringSlice() defer metricshelper.PutStringSlice(tags) *tags = append(*tags, name) - blackHoleFilterState.WithLabelValues(*tags...).Set(float64(state)) - blackHoleFilterSuccessFraction.WithLabelValues(*tags...).Set(successFraction) - blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) + blackHoleSuccessCounterState.WithLabelValues(*tags...).Set(float64(state)) + blackHoleSuccessCounterSuccessFraction.WithLabelValues(*tags...).Set(successFraction) + blackHoleSuccessCounterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) } diff --git a/p2p/net/swarm/swarm_metrics_test.go b/p2p/net/swarm/swarm_metrics_test.go index e415c55fb8..45ce0c2e47 100644 --- a/p2p/net/swarm/swarm_metrics_test.go +++ b/p2p/net/swarm/swarm_metrics_test.go @@ -94,8 +94,8 @@ func TestMetricsNoAllocNoCover(t *testing.T) { "FailedDialing": func() { mt.FailedDialing(randItem(addrs), randItem(errors), randItem(errors)) }, "DialCompleted": func() { mt.DialCompleted(mrand.Intn(2) == 1, mrand.Intn(10)) }, "DialRankingDelay": func() { mt.DialRankingDelay(time.Duration(mrand.Intn(1e10))) }, - "UpdatedBlackHoleFilterState": func() { - mt.UpdatedBlackHoleFilterState( + "UpdatedBlackHoleSuccessCounter": func() { + mt.UpdatedBlackHoleSuccessCounter( randItem(bhfNames), randItem(bhfState), mrand.Intn(100), diff --git a/p2p/protocol/autonatv2/autonat_test.go b/p2p/protocol/autonatv2/autonat_test.go index 4fb6a7e0ea..a2c9f26b21 100644 --- a/p2p/protocol/autonatv2/autonat_test.go +++ b/p2p/protocol/autonatv2/autonat_test.go @@ -33,8 +33,8 @@ func newAutoNAT(t testing.TB, dialer host.Host, opts ...AutoNATOption) *AutoNAT dialer = bhost.NewBlankHost( swarmt.GenSwarm(t, swarmt.WithSwarmOpts( - swarm.WithUDPBlackHoleFilter(nil), - swarm.WithIPv6BlackHoleFilter(nil)))) + swarm.WithUDPBlackHoleSuccessCounter(nil), + swarm.WithIPv6BlackHoleSuccessCounter(nil)))) } an, err := New(h, dialer, opts...) if err != nil { diff --git a/p2p/protocol/autonatv2/client.go b/p2p/protocol/autonatv2/client.go index b2f6bf89bf..1d4f3b2819 100644 --- a/p2p/protocol/autonatv2/client.go +++ b/p2p/protocol/autonatv2/client.go @@ -287,13 +287,13 @@ func (ac *client) handleDialBack(s network.Stream) { } } -func areAddrsConsistent(local, external ma.Multiaddr) bool { - if local == nil || external == nil { +func areAddrsConsistent(connLocalAddr, dialedAddr ma.Multiaddr) bool { + if connLocalAddr == nil || dialedAddr == nil { return false } - localProtos := local.Protocols() - externalProtos := external.Protocols() + localProtos := connLocalAddr.Protocols() + externalProtos := dialedAddr.Protocols() if len(localProtos) != len(externalProtos) { return false } diff --git a/p2p/protocol/autonatv2/server.go b/p2p/protocol/autonatv2/server.go index 6e03ab727c..3a8b7ea08d 100644 --- a/p2p/protocol/autonatv2/server.go +++ b/p2p/protocol/autonatv2/server.go @@ -61,6 +61,7 @@ func (as *server) Start() { func (as *server) Close() { as.host.RemoveStreamHandler(DialProtocol) as.dialerHost.Close() + as.limiter.Close() } // handleDialRequest is the dial-request protocol stream handler @@ -157,7 +158,7 @@ func (as *server) handleDialRequest(s network.Stream) { nonce := msg.GetDialRequest().Nonce isDialDataRequired := as.dialDataRequestPolicy(s, dialAddr) - if !as.limiter.AcceptDialDataRequest(p) { + if isDialDataRequired && !as.limiter.AcceptDialDataRequest(p) { msg = pb.Message{ Msg: &pb.Message_DialResponse{ DialResponse: &pb.DialResponse{ @@ -283,6 +284,7 @@ type rateLimiter struct { DialDataRPM int mu sync.Mutex + closed bool reqs []entry peerReqs map[peer.ID][]time.Time dialDataReqs []time.Time @@ -302,6 +304,9 @@ type entry struct { func (r *rateLimiter) Accept(p peer.ID) bool { r.mu.Lock() defer r.mu.Unlock() + if r.closed { + return false + } if r.peerReqs == nil { r.peerReqs = make(map[peer.ID][]time.Time) r.ongoingReqs = make(map[peer.ID]struct{}) @@ -326,6 +331,9 @@ func (r *rateLimiter) Accept(p peer.ID) bool { func (r *rateLimiter) AcceptDialDataRequest(p peer.ID) bool { r.mu.Lock() defer r.mu.Unlock() + if r.closed { + return false + } if r.peerReqs == nil { r.peerReqs = make(map[peer.ID][]time.Time) r.ongoingReqs = make(map[peer.ID]struct{}) @@ -378,10 +386,18 @@ func (r *rateLimiter) cleanup(now time.Time) { func (r *rateLimiter) CompleteRequest(p peer.ID) { r.mu.Lock() defer r.mu.Unlock() - delete(r.ongoingReqs, p) } +func (r *rateLimiter) Close() { + r.mu.Lock() + defer r.mu.Unlock() + r.closed = true + r.peerReqs = nil + r.ongoingReqs = nil + r.dialDataReqs = nil +} + // amplificationAttackPrevention is a dialDataRequestPolicy which requests data when the peer's observed // IP address is different from the dial back IP address func amplificationAttackPrevention(s network.Stream, dialAddr ma.Multiaddr) bool { diff --git a/p2p/protocol/autonatv2/server_test.go b/p2p/protocol/autonatv2/server_test.go index d23d02c210..55df052ccb 100644 --- a/p2p/protocol/autonatv2/server_test.go +++ b/p2p/protocol/autonatv2/server_test.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math" + "sync" + "sync/atomic" "testing" "time" @@ -283,41 +285,56 @@ func TestRateLimiter(t *testing.T) { func TestRateLimiterStress(t *testing.T) { cl := test.NewMockClock() - for i := 0; i < 100; i++ { + for i := 0; i < 10; i++ { r := rateLimiter{RPM: 20 + i, PerPeerRPM: 10 + i, DialDataRPM: i, now: cl.Now} peers := make([]peer.ID, 10+i) for i := 0; i < len(peers); i++ { peers[i] = peer.ID(fmt.Sprintf("peer-%d", i)) } - peerSuccesses := make([]int, len(peers)) - success := 0 - dialDataSuccesses := 0 - for i := 0; i < 10*60; i++ { - for j, p := range peers { - if r.Accept(p) { - success++ - peerSuccesses[j]++ + peerSuccesses := make([]atomic.Int64, len(peers)) + var success, dialDataSuccesses atomic.Int64 + var wg sync.WaitGroup + for k := 0; k < 5; k++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 2*60; i++ { + for j, p := range peers { + if r.Accept(p) { + success.Add(1) + peerSuccesses[j].Add(1) + } + if r.AcceptDialDataRequest(p) { + dialDataSuccesses.Add(1) + } + r.CompleteRequest(p) + } + cl.AdvanceBy(time.Second) } - if r.AcceptDialDataRequest(p) { - dialDataSuccesses++ - } - r.CompleteRequest(p) - } - cl.AdvanceBy(time.Second) + }() } - if success > 10*r.RPM || success < 9*r.RPM { - t.Fatalf("too many successes, RPM=%d", r.RPM) + wg.Wait() + if int(success.Load()) > 10*r.RPM || int(success.Load()) < 9*r.RPM { + t.Fatalf("invalid successes, %d, expected %d-%d", success.Load(), 9*r.RPM, 10*r.RPM) } - if dialDataSuccesses > 10*r.DialDataRPM || dialDataSuccesses < 9*r.DialDataRPM { - t.Fatalf("too may dial data successes, DialDataRPM=%d", r.DialDataRPM) + if int(dialDataSuccesses.Load()) > 10*r.DialDataRPM || int(dialDataSuccesses.Load()) < 9*r.DialDataRPM { + t.Fatalf("invalid dial data successes, %d expected %d-%d", dialDataSuccesses.Load(), 9*r.DialDataRPM, 10*r.DialDataRPM) } - for _, s := range peerSuccesses { + for i := range peerSuccesses { // We cannot check the lower bound because some peers would be hitting the global rpm limit - if s > 10*r.PerPeerRPM { + if int(peerSuccesses[i].Load()) > 10*r.PerPeerRPM { t.Fatalf("too many per peer successes, PerPeerRPM=%d", r.PerPeerRPM) } } + cl.AdvanceBy(1 * time.Minute) + require.True(t, r.Accept(peers[0])) + // Assert lengths to check that we are cleaning up correctly + require.Equal(t, len(r.reqs), 1) + require.Equal(t, len(r.peerReqs), 1) + require.Equal(t, len(r.peerReqs[peers[0]]), 1) + require.Equal(t, len(r.dialDataReqs), 0) + require.Equal(t, len(r.ongoingReqs), 1) } }