Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 16, 2023
1 parent 5c1b358 commit bbdcb0a
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 83 deletions.
94 changes: 51 additions & 43 deletions p2p/net/swarm/black_hole_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import (
manet "github.com/multiformats/go-multiaddr/net"
)

type outcome int
type blackHoleState int

const (
outcomeSuccess outcome = iota
outcomeFailed
blackHoleStateAllowed blackHoleState = iota
blackHoleStateBlocked
)

type blackHoleState int
type blackHoleResult int

const (
blackHoleStateAllowed blackHoleState = iota
blackHoleStateBlocked
blackHoleResultAllowed blackHoleResult = iota
blackHoleResultProbing
blackHoleResultBlocked
)

// blackHoleFilter provides black hole filtering logic for dials. On detecting a black holed
Expand All @@ -41,8 +42,8 @@ type blackHoleFilter struct {

// requests counts number of dial requests up to n. Resets to 0 every nth request.
requests int
// outcomes of the last `n` allowed dials
outcomes []outcome
// dialResults of the last `n` allowed dials. success is true.
dialResults []bool
// successes is the count of successful dials in outcomes
successes int
// failures is the count of failed dials in outcomes
Expand All @@ -54,14 +55,10 @@ type blackHoleFilter struct {
metricsTracer MetricsTracer
}

// RecordOutcome records the outcome of a dial. A successful dial will change the state
// RecordResult records the outcome of a dial. A successful dial will change the state
// of the filter to Allowed. A failed dial only blocks subsequent requests if the success
// fraction over the last n outcomes is less than the minSuccessFraction of the filter.
func (b *blackHoleFilter) RecordOutcome(success bool) {
if b == nil {
return
}

func (b *blackHoleFilter) RecordResult(success bool) {
b.mu.Lock()
defer b.mu.Unlock()

Expand All @@ -75,51 +72,54 @@ func (b *blackHoleFilter) RecordOutcome(success bool) {

if success {
b.successes++
b.outcomes = append(b.outcomes, outcomeSuccess)
} else {
b.failures++
b.outcomes = append(b.outcomes, outcomeFailed)
}
b.dialResults = append(b.dialResults, success)

if len(b.outcomes) > b.n {
if b.outcomes[0] == outcomeSuccess {
if len(b.dialResults) > b.n {
if b.dialResults[0] {
b.successes--
} else {
b.failures--
}
b.outcomes = b.outcomes[1 : b.n+1]
b.dialResults = b.dialResults[1 : b.n+1]
}

b.updateState()
b.trackMetrics()
}

func (b *blackHoleFilter) IsAllowed() (state blackHoleState, isAllowed bool) {
if b == nil {
return blackHoleStateAllowed, true
}

// HandleRequest handles a new dial request for the filter. It returns a
func (b *blackHoleFilter) HandleRequest() blackHoleResult {
b.mu.Lock()
defer b.mu.Unlock()

b.requests++
if b.requests == b.n {
b.requests = 0
}

b.trackMetrics()
return b.state, (b.state == blackHoleStateAllowed) || (b.requests == 0)

if b.state == blackHoleStateAllowed {
return blackHoleResultAllowed
} else if b.requests%b.n == 0 {
return blackHoleResultProbing
} else {
return blackHoleResultBlocked
}
}

func (b *blackHoleFilter) reset() {
b.successes = 0
b.failures = 0
b.outcomes = b.outcomes[:0]
b.dialResults = b.dialResults[:0]
b.requests = 0
b.updateState()
}

func (b *blackHoleFilter) updateState() {
st := b.state
successFraction := 0.0
if len(b.outcomes) < b.n {
if len(b.dialResults) < b.n {
b.state = blackHoleStateAllowed
} else {
successFraction = float64(b.successes) / float64(b.successes+b.failures)
Expand All @@ -146,10 +146,15 @@ func (b *blackHoleFilter) trackMetrics() {
if b.successes+b.failures != 0 {
successFraction = float64(b.successes) / float64(b.successes+b.failures)
}

nextRequestAllowedAfter := 0
if b.state == blackHoleStateBlocked {
nextRequestAllowedAfter = b.n - (b.requests % b.n)
}
b.metricsTracer.UpdatedBlackHoleFilterState(
b.name,
b.state,
b.n-b.requests,
nextRequestAllowedAfter,
successFraction,
)
}
Expand All @@ -160,44 +165,47 @@ type blackHoleDetector struct {
udp, ipv6 *blackHoleFilter
}

func (d *blackHoleDetector) IsAllowed(addr ma.Multiaddr) bool {
func (d *blackHoleDetector) HandleRequest(addr ma.Multiaddr) bool {
if !manet.IsPublicAddr(addr) {
return true
}

udpState, udpAllowed := blackHoleStateAllowed, true
udpres := blackHoleResultAllowed
if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) {
udpState, udpAllowed = d.udp.IsAllowed()
udpres = d.udp.HandleRequest()
}

ipv6State, ipv6Allowed := blackHoleStateAllowed, true
ipv6res := blackHoleResultAllowed
if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) {
ipv6State, ipv6Allowed = d.ipv6.IsAllowed()
ipv6res = d.ipv6.HandleRequest()
}

// Allow all probes irrespective of the state of the other filter
if (udpState == blackHoleStateBlocked && udpAllowed) ||
(ipv6State == blackHoleStateBlocked && ipv6Allowed) {
if udpres == blackHoleResultProbing || ipv6res == blackHoleResultProbing {
return true
}
return (udpAllowed && ipv6Allowed)
return (udpres != blackHoleResultBlocked && ipv6res != blackHoleResultBlocked)
}

// RecordOutcome updates the state of the relevant `blackHoleFilter` for addr
func (d *blackHoleDetector) RecordOutcome(addr ma.Multiaddr, success bool) {
// RecordResult updates the state of the relevant `blackHoleFilter` for addr
func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) {
if !manet.IsPublicAddr(addr) {
return
}
if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) {
d.udp.RecordOutcome(success)
d.udp.RecordResult(success)
}
if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) {
d.ipv6.RecordOutcome(success)
d.ipv6.RecordResult(success)
}
}

func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHoleDetector {
d := &blackHoleDetector{}

// A black hole is a binary property. On a network if UDP dials are blocked or there is
// no IPv6 connectivity, all dials will fail. So a low min success fraction like 0.01 is
// good enough.
if detectUDP {
d.udp = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "UDP", metricsTracer: mt}
}
Expand Down
77 changes: 39 additions & 38 deletions p2p/net/swarm/black_hole_detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,90 +8,91 @@ import (

func TestBlackHoleFilterReset(t *testing.T) {
n := 10
bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"}
bhf := &blackHoleFilter{n: n, minSuccessFraction: 0.05, name: "test"}
var i = 0
// calls up to threshold should be allowed
for i = 1; i <= n; i++ {
if _, isAllowed := bhd.IsAllowed(); !isAllowed {
if bhf.HandleRequest() != blackHoleResultAllowed {
t.Fatalf("expected calls up to minDials to be allowed")
}
bhd.RecordOutcome(false)
bhf.RecordResult(false)
}

// after threshold calls every nth call should be allowed
for i = n + 1; i < 42; i++ {
_, isAllowed := bhd.IsAllowed()
if (i%n == 0 && !isAllowed) || (i%n != 0 && isAllowed) {
result := bhf.HandleRequest()
if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) {
t.Fatalf("expected every nth dial to be allowed")
}
}

bhd.RecordOutcome(true)
bhf.RecordResult(true)
// check if calls up to threshold are allowed after success
for i = 0; i < n; i++ {
if _, isAllowed := bhd.IsAllowed(); !isAllowed {
if bhf.HandleRequest() != blackHoleResultAllowed {
t.Fatalf("expected black hole detector state to reset after success")
}
bhd.RecordOutcome(false)
bhf.RecordResult(false)
}

// next call should be refused
if _, isAllowed := bhd.IsAllowed(); isAllowed {
if bhf.HandleRequest() != blackHoleResultBlocked {
t.Fatalf("expected dial to be blocked")
}
}

func TestBlackHoleDetector(t *testing.T) {
func TestBlackHoleFilterSuccessFraction(t *testing.T) {
n := 10
bhd := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"}
bhf := &blackHoleFilter{n: n, minSuccessFraction: 0.4, name: "test"}
var i = 0
// 5 success and 5 fails
for i = 1; i <= 5; i++ {
bhd.RecordOutcome(true)
bhf.RecordResult(true)
}
for i = 1; i <= 5; i++ {
bhd.RecordOutcome(false)
bhf.RecordResult(false)
}

if _, isAllowed := bhd.IsAllowed(); !isAllowed {
if bhf.HandleRequest() != blackHoleResultAllowed {
t.Fatalf("expected dial to be allowed")
}
// 4 success and 6 fails
bhd.RecordOutcome(false)
bhf.RecordResult(false)

if _, isAllowed := bhd.IsAllowed(); !isAllowed {
if bhf.HandleRequest() != blackHoleResultAllowed {
t.Fatalf("expected dial to be allowed")
}
// 3 success and 7 fails
bhd.RecordOutcome(false)
bhf.RecordResult(false)

// should be blocked
if _, isAllowed := bhd.IsAllowed(); isAllowed {
if bhf.HandleRequest() != blackHoleResultBlocked {
t.Fatalf("expected dial to be blocked")
}

bhd.RecordOutcome(true)
bhf.RecordResult(true)
// 5 success and 5 fails
for i = 1; i <= 5; i++ {
bhd.RecordOutcome(true)
bhf.RecordResult(true)
}
for i = 1; i <= 5; i++ {
bhd.RecordOutcome(false)
bhf.RecordResult(false)
}

if _, isAllowed := bhd.IsAllowed(); !isAllowed {
if bhf.HandleRequest() != blackHoleResultAllowed {
t.Fatalf("expected dial to be allowed")
}
// 4 success and 6 fails
bhd.RecordOutcome(false)
bhf.RecordResult(false)

if _, isAllowed := bhd.IsAllowed(); !isAllowed {
if bhf.HandleRequest() != blackHoleResultAllowed {
t.Fatalf("expected dial to be allowed")
}
// 3 success and 7 fails
bhd.RecordOutcome(false)
bhf.RecordResult(false)

// should be blocked
if _, isAllowed := bhd.IsAllowed(); isAllowed {
if bhf.HandleRequest() != blackHoleResultBlocked {
t.Fatalf("expected dial to be blocked")
}

Expand All @@ -101,28 +102,28 @@ func TestBlackHoleDetectorInApplicableAddress(t *testing.T) {
bhd := newBlackHoleDetector(true, true, nil)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1234")
for i := 0; i < 1000; i++ {
if !bhd.IsAllowed(addr) {
if !bhd.HandleRequest(addr) {
t.Fatalf("expect dials to inapplicable address to always be allowed")
}
bhd.RecordOutcome(addr, false)
bhd.RecordResult(addr, false)
}
}

func TestBlackHoleDetectorUDP(t *testing.T) {
bhd := newBlackHoleDetector(true, true, nil)
addr := ma.StringCast("/ip4/1.2.3.4/udp/1234")
for i := 0; i < 100; i++ {
bhd.RecordOutcome(addr, false)
bhd.RecordResult(addr, false)
}
if bhd.IsAllowed(addr) {
if bhd.HandleRequest(addr) {
t.Fatalf("expect dial to be be blocked")
}

bhd = newBlackHoleDetector(false, true, nil)
for i := 0; i < 100; i++ {
bhd.RecordOutcome(addr, false)
bhd.RecordResult(addr, false)
}
if !bhd.IsAllowed(addr) {
if !bhd.HandleRequest(addr) {
t.Fatalf("expected dial to be be allowed when UDP detection is disabled")
}
}
Expand All @@ -131,17 +132,17 @@ func TestBlackHoleDetectorIPv6(t *testing.T) {
bhd := newBlackHoleDetector(true, true, nil)
addr := ma.StringCast("/ip6/1::1/tcp/1234")
for i := 0; i < 100; i++ {
bhd.RecordOutcome(addr, false)
bhd.RecordResult(addr, false)
}
if bhd.IsAllowed(addr) {
if bhd.HandleRequest(addr) {
t.Fatalf("expect dial to be be blocked")
}

bhd = newBlackHoleDetector(true, false, nil)
for i := 0; i < 100; i++ {
bhd.RecordOutcome(addr, false)
bhd.RecordResult(addr, false)
}
if !bhd.IsAllowed(addr) {
if !bhd.HandleRequest(addr) {
t.Fatalf("expected dial to be be allowed when IPv6 detection is disabled")
}
}
Expand All @@ -153,10 +154,10 @@ func TestBlackHoleDetectorProbes(t *testing.T) {
}
udp6Addr := ma.StringCast("/ip6/1::1/udp/1234/quic-v1")
for i := 0; i < 3; i++ {
bhd.RecordOutcome(udp6Addr, false)
bhd.RecordResult(udp6Addr, false)
}
for i := 1; i < 100; i++ {
isAllowed := bhd.IsAllowed(udp6Addr)
isAllowed := bhd.HandleRequest(udp6Addr)
if i%2 == 0 || i%3 == 0 {
if !isAllowed {
t.Fatalf("expected probe to be allowed irrespective of the state of other black hole filter")
Expand Down
Loading

0 comments on commit bbdcb0a

Please sign in to comment.