From 41808dfe60aa4a88decdf20721057d4a281924ec Mon Sep 17 00:00:00 2001 From: sukun Date: Wed, 28 Jun 2023 00:51:29 +0530 Subject: [PATCH] address review comments --- dashboards/swarm/swarm.json | 29 ++- p2p/net/swarm/black_hole_detector.go | 225 ++++++++++++--------- p2p/net/swarm/black_hole_detector_test.go | 226 +++++++++++----------- p2p/net/swarm/dial_worker.go | 4 + p2p/net/swarm/swarm.go | 24 +-- p2p/net/swarm/swarm_dial.go | 18 +- p2p/net/swarm/swarm_dial_test.go | 19 +- p2p/net/swarm/swarm_metrics.go | 4 +- 8 files changed, 295 insertions(+), 254 deletions(-) diff --git a/dashboards/swarm/swarm.json b/dashboards/swarm/swarm.json index 0c41bee76e..9d15e6f0ef 100644 --- a/dashboards/swarm/swarm.json +++ b/dashboards/swarm/swarm.json @@ -3071,13 +3071,18 @@ { "options": { "0": { - "color": "green", + "color": "blue", "index": 0, - "text": "Allowed" + "text": "Probing" }, "1": { - "color": "purple", + "color": "green", "index": 1, + "text": "Allowed" + }, + "2": { + "color": "purple", + "index": 2, "text": "Blocked" } }, @@ -3145,7 +3150,17 @@ "fixedColor": "purple", "mode": "fixed" }, - "mappings": [], + "mappings": [ + { + "options": { + "0": { + "index": 0, + "text": "-" + } + }, + "type": "value" + } + ], "thresholds": { "mode": "absolute", "steps": [ @@ -3169,7 +3184,7 @@ "colorMode": "value", "graphMode": "none", "justifyMode": "auto", - "orientation": "auto", + "orientation": "horizontal", "reduceOptions": { "calcs": [ "lastNotNull" @@ -3218,7 +3233,7 @@ }, { "color": "green", - "value": 1 + "value": 5 } ] } @@ -3302,6 +3317,6 @@ "timezone": "", "title": "libp2p Swarm", "uid": "a15PyhO4z", - "version": 6, + "version": 7, "weekStart": "" } \ No newline at end of file diff --git a/p2p/net/swarm/black_hole_detector.go b/p2p/net/swarm/black_hole_detector.go index 3c63868ac1..0c415080e0 100644 --- a/p2p/net/swarm/black_hole_detector.go +++ b/p2p/net/swarm/black_hole_detector.go @@ -1,52 +1,70 @@ package swarm import ( + "fmt" "sync" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" ) -type outcome int +type blackHoleState int const ( - outcomeSuccess outcome = iota - outcomeFailed + blackHoleStateProbing blackHoleState = iota + blackHoleStateAllowed + blackHoleStateBlocked ) -type blackHoleState int +func (st blackHoleState) String() string { + switch st { + case blackHoleStateProbing: + return "Probing" + case blackHoleStateAllowed: + return "Allowed" + case blackHoleStateBlocked: + return "Blocked" + default: + return fmt.Sprintf("Unknown %d", st) + } +} + +type blackHoleResult int const ( - blackHoleStateAllowed blackHoleState = iota - blackHoleStateBlocked + blackHoleResultAllowed blackHoleResult = iota + blackHoleResultProbing + blackHoleResultBlocked ) -// blackHoleFilter provides black hole filtering logic for dials. On detecting a black holed -// network environment, subsequent dials are blocked and only 1 dial every n requests is allowed. -// This should be used in conjunction with an UDP or IPv6 address filter to detect UDP or -// IPv6 black hole. -// Requests are blocked if the success fraction in the last n outcomes is less than -// minSuccessFraction. If a request succeeds in Blocked state, the filter state is reset and n -// subsequent requests are allowed before reevaluating black hole status. Evaluating over n -// outcomes avoids situations where a dial was cancelled because a competing dial succeeded, -// the address was unreachable, and other false negatives. +// blackHoleFilter provides black hole filtering for dials. This filter should be used in +// concert with a UDP of IPv6 address filter to detect UDP or IPv6 black hole. In a black +// holed environments dial requests are blocked and only periodic probes to check the +// state of the black hole are allowed. +// +// Requests are blocked if the number of successes in the last n dials is less than +// minSuccesses. If a request succeeds in Blocked state, the filter state is reset and n +// subsequent requests are allowed before reevaluating black hole state. Dials cancelled +// when some other concurrent dial succeeded are counted as failures. A sufficiently large +// n prevents false negatives in such cases. type blackHoleFilter struct { - // n is the minimum number of completed dials required before we start blocking. - // Every nth request is allowed irrespective of the status of the detector. + // n serves the dual purpose of being the minimum number of requests after which we + // probe the state of the black hole in blocked state and the minimum number of + // completed dials required before evaluating black hole state. n int - // minSuccessFraction is the minimum success fraction required to allow dials. - minSuccessFraction float64 + // minSuccesses is the minimum number of Success required in the last n dials + // to consider we are not blocked. + minSuccesses int // name for the detector. name string - // requests counts number of dial requests up to n. Resets to 0 every nth request. + // requests counts number of dial requests to peers. We handle request at a peer + // level and record results at individual address dial level. requests int - // outcomes of the last `n` allowed dials - outcomes []outcome + // dialResults of the last `n` dials. A successful dial is true. + dialResults []bool // successes is the count of successful dials in outcomes successes int - // failures is the count of failed dials in outcomes - failures int // state is the current state of the detector state blackHoleState @@ -54,14 +72,10 @@ type blackHoleFilter struct { metricsTracer MetricsTracer } -// RecordOutcome records the outcome of a dial. A successful dial will change the state +// RecordResult records the outcome of a dial. A successful dial will change the state // of the filter to Allowed. A failed dial only blocks subsequent requests if the success // fraction over the last n outcomes is less than the minSuccessFraction of the filter. -func (b *blackHoleFilter) RecordOutcome(success bool) { - if b == nil { - return - } - +func (b *blackHoleFilter) RecordResult(success bool) { b.mu.Lock() defer b.mu.Unlock() @@ -75,66 +89,58 @@ func (b *blackHoleFilter) RecordOutcome(success bool) { if success { b.successes++ - b.outcomes = append(b.outcomes, outcomeSuccess) - } else { - b.failures++ - b.outcomes = append(b.outcomes, outcomeFailed) } + b.dialResults = append(b.dialResults, success) - if len(b.outcomes) > b.n { - if b.outcomes[0] == outcomeSuccess { + if len(b.dialResults) > b.n { + if b.dialResults[0] { b.successes-- - } else { - b.failures-- } - b.outcomes = b.outcomes[1 : b.n+1] + b.dialResults = b.dialResults[1:] } b.updateState() b.trackMetrics() } -func (b *blackHoleFilter) IsAllowed() (state blackHoleState, isAllowed bool) { - if b == nil { - return blackHoleStateAllowed, true - } - +// HandleRequest returns the result of applying the black hole filter for the request. +func (b *blackHoleFilter) HandleRequest() blackHoleResult { b.mu.Lock() defer b.mu.Unlock() + b.requests++ - if b.requests == b.n { - b.requests = 0 - } + b.trackMetrics() - return b.state, (b.state == blackHoleStateAllowed) || (b.requests == 0) + + if b.state == blackHoleStateAllowed { + return blackHoleResultAllowed + } else if b.state == blackHoleStateProbing || b.requests%b.n == 0 { + return blackHoleResultProbing + } else { + return blackHoleResultBlocked + } } func (b *blackHoleFilter) reset() { b.successes = 0 - b.failures = 0 - b.outcomes = b.outcomes[:0] + b.dialResults = b.dialResults[:0] + b.requests = 0 b.updateState() } func (b *blackHoleFilter) updateState() { st := b.state - successFraction := 0.0 - if len(b.outcomes) < b.n { + + if len(b.dialResults) < b.n { + b.state = blackHoleStateProbing + } else if b.successes >= b.minSuccesses { b.state = blackHoleStateAllowed } else { - successFraction = float64(b.successes) / float64(b.successes+b.failures) - if successFraction >= b.minSuccessFraction { - b.state = blackHoleStateAllowed - } else { - b.state = blackHoleStateBlocked - } + b.state = blackHoleStateBlocked } + if st != b.state { - if b.state == blackHoleStateAllowed { - log.Debugf("%s blackHoleDetector state changed to Allowed", b.name) - } else { - log.Debugf("%s blackHoleDetector state changed to Blocked. Success fraction is %0.3f", b.name, successFraction) - } + log.Debugf("%s blackHoleDetector state changed from %s to %s", b.name, st, b.state) } } @@ -142,67 +148,110 @@ func (b *blackHoleFilter) trackMetrics() { if b.metricsTracer == nil { return } + + nextRequestAllowedAfter := 0 + if b.state == blackHoleStateBlocked { + nextRequestAllowedAfter = b.n - (b.requests % b.n) + } + successFraction := 0.0 - if b.successes+b.failures != 0 { - successFraction = float64(b.successes) / float64(b.successes+b.failures) + if len(b.dialResults) > 0 { + successFraction = float64(b.successes) / float64(len(b.dialResults)) } + b.metricsTracer.UpdatedBlackHoleFilterState( b.name, b.state, - b.n-b.requests, + nextRequestAllowedAfter, successFraction, ) } // blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter` -// for each. For details of the black hole detection logic see `blackHoleFilter` +// for each. For details of the black hole detection logic see `blackHoleFilter`. +// +// black hole filtering is done at a peer dial level to ensure that periodic probes to +// detect change of the black hole state are actually dialed and are not skipped +// because of dial prioritisation logic. type blackHoleDetector struct { udp, ipv6 *blackHoleFilter } -func (d *blackHoleDetector) IsAllowed(addr ma.Multiaddr) bool { - if !manet.IsPublicAddr(addr) { - return true +// FilterAddrs filters the peer's addresses removing black holed addresses +func (d *blackHoleDetector) FilterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr { + hasUDP, hasIPv6 := false, false + for _, a := range addrs { + if !manet.IsPublicAddr(a) { + continue + } + if isProtocolAddr(a, ma.P_UDP) { + hasUDP = true + } + if isProtocolAddr(a, ma.P_IP6) { + hasIPv6 = true + } } - udpState, udpAllowed := blackHoleStateAllowed, true - if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { - udpState, udpAllowed = d.udp.IsAllowed() + udpRes := blackHoleResultAllowed + if d.udp != nil && hasUDP { + udpRes = d.udp.HandleRequest() } - ipv6State, ipv6Allowed := blackHoleStateAllowed, true - if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { - ipv6State, ipv6Allowed = d.ipv6.IsAllowed() + ipv6Res := blackHoleResultAllowed + if d.ipv6 != nil && hasIPv6 { + ipv6Res = d.ipv6.HandleRequest() } - // Allow all probes irrespective of the state of the other filter - if (udpState == blackHoleStateBlocked && udpAllowed) || - (ipv6State == blackHoleStateBlocked && ipv6Allowed) { - return true - } - return (udpAllowed && ipv6Allowed) + return ma.FilterAddrs( + addrs, + func(a ma.Multiaddr) bool { + if !manet.IsPublicAddr(a) { + return true + } + // allow all UDP addresses while probing irrespective of IPv6 black hole state + if udpRes == blackHoleResultProbing && isProtocolAddr(a, ma.P_UDP) { + return true + } + // allow all IPv6 addresses while probing irrespective of UDP black hole state + if ipv6Res == blackHoleResultProbing && isProtocolAddr(a, ma.P_IP6) { + return true + } + + if udpRes == blackHoleResultBlocked && isProtocolAddr(a, ma.P_UDP) { + return false + } + if ipv6Res == blackHoleResultBlocked && isProtocolAddr(a, ma.P_IP6) { + return false + } + return true + }, + ) } -// RecordOutcome updates the state of the relevant `blackHoleFilter` for addr -func (d *blackHoleDetector) RecordOutcome(addr ma.Multiaddr, success bool) { +// RecordResult updates the state of the relevant `blackHoleFilter`s for addr +func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) { if !manet.IsPublicAddr(addr) { return } if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) { - d.udp.RecordOutcome(success) + d.udp.RecordResult(success) } if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) { - d.ipv6.RecordOutcome(success) + d.ipv6.RecordResult(success) } } func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHoleDetector { d := &blackHoleDetector{} + + // A black hole is a binary property. On a network if UDP dials are blocked or there is + // no IPv6 connectivity, all dials will fail. So a low success rate of 5 out 100 dials + // is good enough. if detectUDP { - d.udp = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "UDP", metricsTracer: mt} + d.udp = &blackHoleFilter{n: 100, minSuccesses: 5, name: "UDP", metricsTracer: mt} } if detectIPv6 { - d.ipv6 = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "IPv6", metricsTracer: mt} + d.ipv6 = &blackHoleFilter{n: 100, minSuccesses: 5, name: "IPv6", metricsTracer: mt} } return d } diff --git a/p2p/net/swarm/black_hole_detector_test.go b/p2p/net/swarm/black_hole_detector_test.go index e53a800241..564fc07767 100644 --- a/p2p/net/swarm/black_hole_detector_test.go +++ b/p2p/net/swarm/black_hole_detector_test.go @@ -1,171 +1,179 @@ package swarm import ( + "fmt" "testing" ma "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" ) func TestBlackHoleFilterReset(t *testing.T) { n := 10 - bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"} + bhf := &blackHoleFilter{n: n, minSuccesses: 2, name: "test"} var i = 0 - // calls up to threshold should be allowed + // calls up to n should be probing for i = 1; i <= n; i++ { - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected calls up to minDials to be allowed") + if bhf.HandleRequest() != blackHoleResultProbing { + t.Fatalf("expected calls up to n to be probes") } - bhd.RecordOutcome(false) + bhf.RecordResult(false) } - // after threshold calls every nth call should be allowed + + // after threshold calls every nth call should be a probe for i = n + 1; i < 42; i++ { - _, isAllowed := bhd.IsAllowed() - if (i%n == 0 && !isAllowed) || (i%n != 0 && isAllowed) { - t.Fatalf("expected every nth dial to be allowed") + result := bhf.HandleRequest() + if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) { + t.Fatalf("expected every nth dial to be a probe") } } - bhd.RecordOutcome(true) - // check if calls up to threshold are allowed after success + bhf.RecordResult(true) + // check if calls up to n are probes again for i = 0; i < n; i++ { - if _, isAllowed := bhd.IsAllowed(); !isAllowed { + if bhf.HandleRequest() != blackHoleResultProbing { t.Fatalf("expected black hole detector state to reset after success") } - bhd.RecordOutcome(false) + bhf.RecordResult(false) } - // next call should be refused - if _, isAllowed := bhd.IsAllowed(); isAllowed { + // next call should be blocked + if bhf.HandleRequest() != blackHoleResultBlocked { t.Fatalf("expected dial to be blocked") } } -func TestBlackHoleDetector(t *testing.T) { +func TestBlackHoleFilterSuccessFraction(t *testing.T) { n := 10 - bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"} - var i = 0 - // 5 success and 5 fails - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(true) - } - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(false) - } - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 4 success and 6 fails - bhd.RecordOutcome(false) - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 3 success and 7 fails - bhd.RecordOutcome(false) - - // should be blocked - if _, isAllowed := bhd.IsAllowed(); isAllowed { - t.Fatalf("expected dial to be blocked") - } - - bhd.RecordOutcome(true) - // 5 success and 5 fails - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(true) - } - for i = 1; i <= 5; i++ { - bhd.RecordOutcome(false) - } - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 4 success and 6 fails - bhd.RecordOutcome(false) - - if _, isAllowed := bhd.IsAllowed(); !isAllowed { - t.Fatalf("expected dial to be allowed") - } - // 3 success and 7 fails - bhd.RecordOutcome(false) - - // should be blocked - if _, isAllowed := bhd.IsAllowed(); isAllowed { - t.Fatalf("expected dial to be blocked") + tests := []struct { + minSuccesses, successes int + result blackHoleResult + }{ + {minSuccesses: 5, successes: 5, result: blackHoleResultAllowed}, + {minSuccesses: 3, successes: 3, result: blackHoleResultAllowed}, + {minSuccesses: 5, successes: 4, result: blackHoleResultBlocked}, + {minSuccesses: 5, successes: 7, result: blackHoleResultAllowed}, + {minSuccesses: 3, successes: 1, result: blackHoleResultBlocked}, + {minSuccesses: 0, successes: 0, result: blackHoleResultAllowed}, + {minSuccesses: 10, successes: 10, result: blackHoleResultAllowed}, + } + for i, tc := range tests { + t.Run(fmt.Sprintf("case-%d", i), func(t *testing.T) { + bhf := blackHoleFilter{n: n, minSuccesses: tc.minSuccesses} + for i := 0; i < tc.successes; i++ { + bhf.RecordResult(true) + } + for i := 0; i < n-tc.successes; i++ { + bhf.RecordResult(false) + } + got := bhf.HandleRequest() + if got != tc.result { + t.Fatalf("expected %d got %d", tc.result, got) + } + }) } - } func TestBlackHoleDetectorInApplicableAddress(t *testing.T) { bhd := newBlackHoleDetector(true, true, nil) - addr := ma.StringCast("/ip4/127.0.0.1/tcp/1234") + addrs := []ma.Multiaddr{ + ma.StringCast("/ip4/1.2.3.4/tcp/1234"), + ma.StringCast("/ip4/1.2.3.4/tcp/1233"), + ma.StringCast("/ip6/::1/udp/1234/quic-v1"), + ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1"), + } for i := 0; i < 1000; i++ { - if !bhd.IsAllowed(addr) { - t.Fatalf("expect dials to inapplicable address to always be allowed") + filteredAddrs := bhd.FilterAddrs(addrs) + require.ElementsMatch(t, addrs, filteredAddrs) + for j := 0; j < len(addrs); j++ { + bhd.RecordResult(addrs[j], false) } - bhd.RecordOutcome(addr, false) } } -func TestBlackHoleDetectorUDP(t *testing.T) { - bhd := newBlackHoleDetector(true, true, nil) - addr := ma.StringCast("/ip4/1.2.3.4/udp/1234") - for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if bhd.IsAllowed(addr) { - t.Fatalf("expect dial to be be blocked") - } - - bhd = newBlackHoleDetector(false, true, nil) +func TestBlackHoleDetectorUDPDisabled(t *testing.T) { + bhd := newBlackHoleDetector(false, true, nil) + publicAddr := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + privAddr := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if !bhd.IsAllowed(addr) { - t.Fatalf("expected dial to be be allowed when UDP detection is disabled") + bhd.RecordResult(publicAddr, false) } + addrs := []ma.Multiaddr{publicAddr, privAddr} + require.ElementsMatch(t, addrs, bhd.FilterAddrs(addrs)) } -func TestBlackHoleDetectorIPv6(t *testing.T) { - bhd := newBlackHoleDetector(true, true, nil) - addr := ma.StringCast("/ip6/1::1/tcp/1234") - for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if bhd.IsAllowed(addr) { - t.Fatalf("expect dial to be be blocked") - } - - bhd = newBlackHoleDetector(true, false, nil) +func TestBlackHoleDetectorIPv6Disabled(t *testing.T) { + bhd := newBlackHoleDetector(true, false, nil) + publicAddr := ma.StringCast("/ip6/1::1/tcp/1234") + privAddr := ma.StringCast("/ip6/::1/tcp/1234") + addrs := []ma.Multiaddr{publicAddr, privAddr} for i := 0; i < 100; i++ { - bhd.RecordOutcome(addr, false) - } - if !bhd.IsAllowed(addr) { - t.Fatalf("expected dial to be be allowed when IPv6 detection is disabled") + bhd.RecordResult(publicAddr, false) } + require.ElementsMatch(t, addrs, bhd.FilterAddrs(addrs)) } func TestBlackHoleDetectorProbes(t *testing.T) { bhd := &blackHoleDetector{ - udp: &blackHoleFilter{n: 2, minSuccessFraction: 0.5}, - ipv6: &blackHoleFilter{n: 3, minSuccessFraction: 0.5}, + udp: &blackHoleFilter{n: 2, minSuccesses: 1, name: "udp"}, + ipv6: &blackHoleFilter{n: 3, minSuccesses: 1, name: "ipv6"}, } udp6Addr := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + addrs := []ma.Multiaddr{udp6Addr} for i := 0; i < 3; i++ { - bhd.RecordOutcome(udp6Addr, false) + bhd.RecordResult(udp6Addr, false) } for i := 1; i < 100; i++ { - isAllowed := bhd.IsAllowed(udp6Addr) + filteredAddrs := bhd.FilterAddrs(addrs) if i%2 == 0 || i%3 == 0 { - if !isAllowed { + if len(filteredAddrs) == 0 { t.Fatalf("expected probe to be allowed irrespective of the state of other black hole filter") } } else { - if isAllowed { - t.Fatalf("expected dial to be blocked") + if len(filteredAddrs) != 0 { + t.Fatalf("expected dial to be blocked %s", filteredAddrs) } } } } + +func TestBlackHoleDetectorAddrFiltering(t *testing.T) { + udp6Pub := ma.StringCast("/ip6/1::1/udp/1234/quic-v1") + udp6Pri := ma.StringCast("/ip6/::1/udp/1234/quic-v1") + upd4Pub := ma.StringCast("/ip4/1.2.3.4/udp/1234/quic-v1") + udp4Pri := ma.StringCast("/ip4/192.168.1.5/udp/1234/quic-v1") + tcp6Pub := ma.StringCast("/ip6/1::1/tcp/1234/quic-v1") + tcp6Pri := ma.StringCast("/ip6/::1/tcp/1234/quic-v1") + tcp4Pub := ma.StringCast("/ip4/1.2.3.4/tcp/1234/quic-v1") + tcp4Pri := ma.StringCast("/ip4/192.168.1.5/tcp/1234/quic-v1") + + makeBHD := func(udpBlocked, ipv6Blocked bool) *blackHoleDetector { + bhd := &blackHoleDetector{ + udp: &blackHoleFilter{n: 100, minSuccesses: 10, name: "udp"}, + ipv6: &blackHoleFilter{n: 100, minSuccesses: 10, name: "ipv6"}, + } + for i := 0; i < 100; i++ { + bhd.RecordResult(upd4Pub, !udpBlocked) + } + for i := 0; i < 100; i++ { + bhd.RecordResult(tcp6Pub, !ipv6Blocked) + } + return bhd + } + + allInput := []ma.Multiaddr{udp6Pub, udp6Pri, upd4Pub, udp4Pri, tcp6Pub, tcp6Pri, + tcp4Pub, tcp4Pri} + + udpBlockedOutput := []ma.Multiaddr{udp6Pri, udp4Pri, tcp6Pub, tcp6Pri, tcp4Pub, tcp4Pri} + bhd := makeBHD(true, false) + require.ElementsMatch(t, udpBlockedOutput, bhd.FilterAddrs(allInput)) + + ip6BlockedOutput := []ma.Multiaddr{udp6Pri, upd4Pub, udp4Pri, tcp6Pri, tcp4Pub, tcp4Pri} + bhd = makeBHD(false, true) + require.ElementsMatch(t, ip6BlockedOutput, bhd.FilterAddrs(allInput)) + + bothBlockedOutput := []ma.Multiaddr{udp6Pri, udp4Pri, tcp6Pri, tcp4Pub, tcp4Pri} + bhd = makeBHD(true, true) + require.ElementsMatch(t, bothBlockedOutput, bhd.FilterAddrs(allInput)) +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index a8252c9680..0334ac863e 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -363,7 +363,11 @@ loop: // we only add backoff if there has not been a successful connection // for consistency with the old dialer behavior. w.s.backf.AddBackoff(w.peer, res.Addr) + } else if res.Err == ErrDialRefusedBlackHole { + log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", + w.peer, res.Addr) } + w.dispatchError(ad, res.Err) // Only schedule next dial on error. // If we scheduleNextDial on success, we will end up making one dial more than diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 65f3294837..416bfd8cda 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -108,24 +108,6 @@ func WithDialRanker(d network.DialRanker) Option { } } -// WithNoIPv6BlackHoleDetection configures swarm to not do any black hole detection for -// IPv6 addresses -func WithNoIPv6BlackHoleDetection() Option { - return func(s *Swarm) error { - s.disableIPv6BlackHoleDetection = true - return nil - } -} - -// WithNoUDPBlackHoleDetection configures swarm to not do any black hole detection for -// UDP addresses -func WithNoUDPBlackHoleDetection() Option { - return func(s *Swarm) error { - s.disableUDPBlackHoleDetection = true - return nil - } -} - // Swarm is a connection muxer, allowing connections to other peers to // be opened and closed, while still using the same Chan for all // communication. The Chan sends/receives Messages, which note the @@ -192,9 +174,7 @@ type Swarm struct { dialRanker network.DialRanker - disableIPv6BlackHoleDetection bool - disableUDPBlackHoleDetection bool - bhd *blackHoleDetector + bhd *blackHoleDetector } // NewSwarm constructs a Swarm. @@ -235,7 +215,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, eventBus event.Bus, opts s.limiter = newDialLimiter(s.dialAddr) s.backf.init(s.ctx) - s.bhd = newBlackHoleDetector(!s.disableUDPBlackHoleDetection, !s.disableIPv6BlackHoleDetection, s.metricsTracer) + s.bhd = newBlackHoleDetector(true, true, s.metricsTracer) return s, nil } diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 91b3e866a9..83799f56f7 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -396,15 +396,6 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr, } } - // Check if dial to the address is black holed. - // This is the best place to have this check since we want to ensure that periodic - // probes allowed by the black hole detector are actually dialed. If this check is - // done before the dial prioritisation logic, we might not dial the address because - // a higher priority address succeeded. - if !s.bhd.IsAllowed(addr) { - return ErrDialRefusedBlackHole - } - // start the dial s.limitedDial(ctx, p, addr, resch) @@ -446,9 +437,13 @@ func (s *Swarm) filterKnownUndialables(p peer.ID, addrs []ma.Multiaddr) []ma.Mul // filter addresses we cannot dial addrs = ma.FilterAddrs(addrs, s.canDial) + // filter low priority addresses among the addresses we can dial addrs = filterLowPriorityAddresses(addrs) + // remove black holed addrs + addrs = s.bhd.FilterAddrs(addrs) + return ma.FilterAddrs(addrs, func(addr ma.Multiaddr) bool { return !ma.Contains(ourAddrs, addr) }, // TODO: Consider allowing link-local addresses @@ -497,7 +492,10 @@ func (s *Swarm) dialAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr) (tra start := time.Now() connC, err := tpt.Dial(ctx, addr, p) - s.bhd.RecordOutcome(addr, err == nil) + // We're recording any error as a failure here. + // Notably, this also applies to cancelations (i.e. if another dial attempt was faster). + // This is ok since the black hole detector uses a very low threshold (5%). + s.bhd.RecordResult(addr, err == nil) if err != nil { if s.metricsTracer != nil { diff --git a/p2p/net/swarm/swarm_dial_test.go b/p2p/net/swarm/swarm_dial_test.go index bfe84c7fdb..2f6b3f8c4d 100644 --- a/p2p/net/swarm/swarm_dial_test.go +++ b/p2p/net/swarm/swarm_dial_test.go @@ -340,10 +340,10 @@ func TestBlackHoledAddrBlocked(t *testing.T) { defer s.Close() n := 3 - s.bhd.udp = &blackHoleFilter{n: n, minSuccessFraction: 0.5, name: "UDP"} + s.bhd.ipv6 = &blackHoleFilter{n: n, minSuccesses: 1, name: "IPv6"} // all dials to the address will fail. RFC6666 Discard Prefix - addr := ma.StringCast("/ip6/0100::1/udp/54321/quic-v1") + addr := ma.StringCast("/ip6/0100::1/tcp/54321/") p, err := test.RandPeerID() if err != nil { @@ -369,18 +369,7 @@ func TestBlackHoledAddrBlocked(t *testing.T) { if conn != nil { t.Fatalf("expected dial to be blocked") } - dialError, ok := err.(*DialError) - if !ok { - t.Fatalf("expected to receive an error of type *DialError, got %T", err) - } - isBlackHoleErr := false - for _, err := range dialError.DialErrors { - if err.Cause == ErrDialRefusedBlackHole { - isBlackHoleErr = true - break - } - } - if !isBlackHoleErr { - t.Fatalf("expected to receive ErrDialRefusedBlackHole %s", err) + if err != ErrNoGoodAddresses { + t.Fatalf("expected to receive an error of type *DialError, got %s of type %T", err, err) } } diff --git a/p2p/net/swarm/swarm_metrics.go b/p2p/net/swarm/swarm_metrics.go index dfa0698b3e..28564e9e54 100644 --- a/p2p/net/swarm/swarm_metrics.go +++ b/p2p/net/swarm/swarm_metrics.go @@ -273,7 +273,5 @@ func (m *metricsTracer) UpdatedBlackHoleFilterState(name string, state blackHole blackHoleFilterState.WithLabelValues(*tags...).Set(float64(state)) blackHoleFilterSuccessFraction.WithLabelValues(*tags...).Set(successFraction) - if state == blackHoleStateBlocked { - blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) - } + blackHoleFilterNextRequestAllowedAfter.WithLabelValues(*tags...).Set(float64(nextProbeAfter)) }