Skip to content

Commit

Permalink
implement black hole detection and happy eyeballs
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 2, 2023
1 parent c0649ef commit 0e6beee
Show file tree
Hide file tree
Showing 5 changed files with 521 additions and 71 deletions.
199 changes: 199 additions & 0 deletions p2p/net/swarm/black_hole_wrapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package swarm

import (
"context"
"errors"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)

type outcome int

const (
outcomeUnknown outcome = iota
outcomeSuccess
outcomeFailed
)

type blackholeState int

const (
blackholeStateAllowed blackholeState = iota
blackholeStateBlocked
)

var ErrDialRefusedBlackHole error = errors.New("dial refused because of blackhole")

// blackHoleWrapper provides black hole detection for dials to addresses selected by the
// selector.
type blackHoleWrapper struct {
// selector selects addresses for blackhole detection. Addresses for which selector
// returns false are ignored
selector func(addr ma.Multiaddr) bool
mu sync.Mutex
// every nth dial to the address is permitted irrespective of the wrapper status
n int
// threshold is the minimum number of completed dials required before dials are blocked
threshold int
// minSuccessFraction is the minimum success fraction required to allow dials
minSuccessFraction float64
// outcomes is the sliding window of last x outcomes
outcomes []outcome
// outcomeIdx is the index of the next outcome
outcomeIdx int
successCnt, failureCnt, total int
// full is used to check whether we have a full sliding window worth of outcomes.
// Keeping this as a separate variable helps avoid clearing out the entire outcomes sliding
// window on reset.
full bool
// state is the current state of the blackhole wrapper
state blackholeState
// name of the detector. Useful for debugging
name string
}

func newBlackHoleWrapper(selector func(addr ma.Multiaddr) bool, n int, threshold int, minSuccessFraction float64, slidingWindowSize int, name string) *blackHoleWrapper {
return &blackHoleWrapper{
selector: selector,
n: n,
threshold: threshold,
minSuccessFraction: minSuccessFraction,
outcomes: make([]outcome, slidingWindowSize),
name: name,
}
}

func newIPv6BlackHoleWrapper() *blackHoleWrapper {
return newBlackHoleWrapper(
func(addr ma.Multiaddr) bool {
isIPv6 := false
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP6 {
isIPv6 = true
}
return false
})
return isIPv6
},
100,
100,
0.01,
1000,
"IPv6",
)
}

func newUDPBlackHoleWrapper() *blackHoleWrapper {
return newBlackHoleWrapper(
func(addr ma.Multiaddr) bool {
isUDP := false
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_UDP {
isUDP = true
}
return true
})
return isUDP
},
100,
100,
0.01,
1000,
"UDP",
)
}

func (b *blackHoleWrapper) Wrap(f dialfunc) dialfunc {
return func(ctx context.Context, i peer.ID, m ma.Multiaddr) (transport.CapableConn, error) {
if !b.selector(m) {
return f(ctx, i, m)
}

b.mu.Lock()
if !b.isAllowed() {
b.mu.Unlock()
return nil, ErrDialRefusedBlackHole
}
b.mu.Unlock()

conn, err := f(ctx, i, m)

b.mu.Lock()
defer b.mu.Unlock()

if b.state == blackholeStateBlocked && err == nil {
// If the call succeeds in a blocked state we reset the whole state to allowed.
// This is better than slowly accumulating values till we cross the minSuccessFraction
// threshold since a udp or ipv6 blackhole is a binary property. So it is better to just
// allow values up to threshold on success.
b.reset()
return conn, err
}

// Discard the earliest outcome. b.full check is required to ensure that
// we don't incorrectly decrement for outcomes after a reset
if b.full {
switch b.outcomes[b.outcomeIdx] {
case outcomeSuccess:
b.successCnt--
case outcomeFailed:
b.failureCnt--
}
}

switch {
// checking for err == nil is easier for testing than conn != nil
case err == nil:
b.successCnt++
b.outcomes[b.outcomeIdx] = outcomeSuccess
default:
b.failureCnt++
b.outcomes[b.outcomeIdx] = outcomeFailed
}

b.outcomeIdx++
if b.outcomeIdx == len(b.outcomes) {
b.outcomeIdx = 0
b.full = true
}
b.updateState()
return conn, err
}
}

func (b *blackHoleWrapper) isAllowed() bool {
b.total++
if b.total%b.n == 0 {
return true
}
return b.state == blackholeStateAllowed
}

func (b *blackHoleWrapper) reset() {
b.successCnt = 0
b.failureCnt = 0
b.outcomeIdx = 0
b.full = false
b.updateState()
}

func (b *blackHoleWrapper) updateState() {
st := b.state
if b.successCnt+b.failureCnt < b.threshold {
b.state = blackholeStateAllowed
} else if float64(b.successCnt)/float64(b.successCnt+b.failureCnt) >= b.minSuccessFraction {
b.state = blackholeStateAllowed
} else {
b.state = blackholeStateBlocked
}
if st != b.state {
if b.state == blackholeStateAllowed {
log.Debugf("blackholeWrapper %s state changed to %s", b.name, "Allowed")
} else {
log.Debugf("blackholeWrapper %s state changed to %s", b.name, "Blocked")
}
}
}
164 changes: 164 additions & 0 deletions p2p/net/swarm/black_hole_wrapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package swarm

import (
"context"
"errors"
"testing"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)

func TestBlackHoleWrapperIrrelevantAddress(t *testing.T) {
ch := make(chan struct{}, 1)
var f dialfunc = func(ctx context.Context, i peer.ID, m ma.Multiaddr) (transport.CapableConn, error) {
ch <- struct{}{}
return nil, nil
}
bhd := newBlackHoleWrapper(func(_ ma.Multiaddr) bool { return false },
10, 10, 1.0, 10, "")
wrappedF := bhd.Wrap(f)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1024")
for i := 0; i < 1000; i++ {
wrappedF(context.Background(), "peerID", addr)
<-ch
}

}

func TestBlackHoleWrapperReset(t *testing.T) {
successCh := make(chan struct{}, 1)
failCh := make(chan struct{}, 1)

errFailedDial := errors.New("failed dial")
var f dialfunc = func(ctx context.Context, i peer.ID, m ma.Multiaddr) (transport.CapableConn, error) {
select {
case <-successCh:
return nil, nil
case <-failCh:
return nil, errFailedDial
}
}
n := 10
threshold := 10
bhd := newBlackHoleWrapper(func(_ ma.Multiaddr) bool { return true },
n, threshold, 0.2, 100, "")
wrappedF := bhd.Wrap(f)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1024")
var i = 0
// calls up to threshold should be allowed
for i = 1; i <= threshold; i++ {
failCh <- struct{}{}
_, err := wrappedF(context.Background(), "peerID", addr)
if err != errFailedDial {
t.Fatalf("expected to receive errFailedDial got %s", err)
}
}
// after threshold calls every nth call should be allowed and everything else should
// be refused
for i = threshold + 1; i < 1000; i++ {
select {
case failCh <- struct{}{}:
default:
}
_, err := wrappedF(context.Background(), "peerID", addr)
if i%n == 0 {
if err != errFailedDial {
t.Fatalf("expected to receive errFailedDial: %s", err)
}
} else {
if err != ErrDialRefusedBlackHole {
t.Fatalf("epxected blackhole detector to block call: %s", err)
}
}
}
// remove any left over items from failCh
select {
case <-failCh:
default:
}
// on a successful call, the wrapper should reset
successCh <- struct{}{}
for i = 1000; ; i++ {
_, err := wrappedF(context.Background(), "peerID", addr)
if i%n == 0 {
// only nth calls are allowed in blocked state
if err != nil {
t.Fatalf("expected err nil, got: %s", err)
}
break
} else {
if err != ErrDialRefusedBlackHole {
t.Fatalf("epxected blackhole detector to block call: %s", err)
}
}
}
// check if calls up to threshold are allowed
for i = 0; i < threshold; i++ {
failCh <- struct{}{}
_, err := wrappedF(context.Background(), "peerID", addr)
if err != errFailedDial {
t.Fatalf("expected to receive errFailedDial got %s", err)
}
}

// next call should be refused
_, err := wrappedF(context.Background(), "peerID", addr)
if err != ErrDialRefusedBlackHole {
t.Fatalf("expected to receive %s got %s", ErrDialRefusedBlackHole, err)
}
}

func TestBlackHoleWrapperSuccessFraction(t *testing.T) {
successCh := make(chan struct{}, 1)
failCh := make(chan struct{}, 1)

errFailedDial := errors.New("failed dial")
var f dialfunc = func(ctx context.Context, i peer.ID, m ma.Multiaddr) (transport.CapableConn, error) {
select {
case <-successCh:
return nil, nil
case <-failCh:
return nil, errFailedDial
}
}
n := 100
threshold := 10
windowSize := 10
bhd := newBlackHoleWrapper(func(_ ma.Multiaddr) bool { return true },
n, threshold, 0.4, windowSize, "")
wrappedF := bhd.Wrap(f)
addr := ma.StringCast("/ip4/127.0.0.1/tcp/1024")
var i = 0
// 5 success and 5 fails
for i = 1; i <= 5; i++ {
successCh <- struct{}{}
wrappedF(context.Background(), "peerID", addr)
}
for i = 1; i <= 5; i++ {
failCh <- struct{}{}
wrappedF(context.Background(), "peerID", addr)
}

// 4 success and 6 fails
failCh <- struct{}{}
_, err := wrappedF(context.Background(), "peerID", addr)
if err != errFailedDial {
t.Fatalf("expected to receive errFailedDial: %s", err)
}

// 3 success and 7 fails
failCh <- struct{}{}
_, err = wrappedF(context.Background(), "peerID", addr)
if err != errFailedDial {
t.Fatalf("expected to receive errFailedDial: %s", err)
}

// should be blocked
failCh <- struct{}{}
_, err = wrappedF(context.Background(), "peerID", addr)
if err != ErrDialRefusedBlackHole {
t.Fatalf("expected to receive ErrDialRefusedBlackHole: %s", err)
}
}
Loading

0 comments on commit 0e6beee

Please sign in to comment.