Skip to content

Commit

Permalink
swarm: implement blackhole detection and happy eyeballs dialing
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 5, 2023
1 parent 6f27081 commit 57de43c
Show file tree
Hide file tree
Showing 6 changed files with 630 additions and 97 deletions.
216 changes: 216 additions & 0 deletions p2p/net/swarm/black_hole_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package swarm

import (
"context"
"errors"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)

type outcome int

const (
outcomeSuccess outcome = iota
outcomeFailed
)

type blackholeState int

const (
blackholeStateAllowed blackholeState = iota
blackholeStateBlocked
)

var ErrDialRefusedBlackHole error = errors.New("dial refused because of black hole")

// blackHoleDetector provides black hole detection for dials to addresses selected by the
// selector. On detecting that dials to addresses selected by the selector are failing,
// subsequent dials to the addresses are refused and only 1 dial every n attempts is allowed.
type blackHoleDetector struct {
// selector selects addresses for blackhole detection. Dials to addresses for which
// the selector returns false are always allowed.
selector func(addr ma.Multiaddr) bool
// every nth dial to the address is permitted irrespective of the wrapper status
n int
// minDials is the minimum number of completed dials required before dials are blocked
minDials int
// minSuccessFraction is the minimum success fraction required to allow dials
minSuccessFraction float64
// name for the detector. Useful for debugging
name string

// requests counts number of dial requests up to nth request. Resets to 0 every nth request.
requests int
// allowed counts the number of dials allowed up to `minDials`
allowed int
// outcomes of the last x allowed dials
outcomes []outcome
// outcomeIdx is the index of the next outcome in the sliding window
outcomeIdx int
// successes is the count of successful dials in outcomes
successes int
// failures is the count of failed dials in outcomes
failures int
// full is true when we have a full sliding window worth of outcomes.
// Keeping this as a separate variable helps avoid clearing out the entire sliding window on
// reset.
full bool
// state is the current state of the detector
state blackholeState

mu sync.Mutex
}

func newBlackHoleDetector(selector func(addr ma.Multiaddr) bool, name string, allowNth int, minDials int, slidingWindowSize int, minSuccessFraction float64) *blackHoleDetector {
return &blackHoleDetector{
selector: selector,
n: allowNth,
minDials: minDials,
minSuccessFraction: minSuccessFraction,
outcomes: make([]outcome, slidingWindowSize),
name: name,
}
}

func newIPv6BlackHoleWrapper() *blackHoleDetector {
return newBlackHoleDetector(
func(addr ma.Multiaddr) bool {
isIPv6 := false
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_IP6 {
isIPv6 = true
}
return false
})
return isIPv6
},
"IPv6",
100,
100,
1000,
0.01,
)
}

func newUDPBlackHoleWrapper() *blackHoleDetector {
return newBlackHoleDetector(
func(addr ma.Multiaddr) bool {
isUDP := false
ma.ForEach(addr, func(c ma.Component) bool {
if c.Protocol().Code == ma.P_UDP {
isUDP = true
}
return true
})
return isUDP
},
"UDP",
100,
100,
1000,
0.01,
)
}

func (b *blackHoleDetector) Wrap(f dialfunc) dialfunc {
return func(ctx context.Context, i peer.ID, m ma.Multiaddr) (transport.CapableConn, error) {
if !b.selector(m) {
return f(ctx, i, m)
}

b.mu.Lock()
if !b.isAllowed() {
b.mu.Unlock()
return nil, ErrDialRefusedBlackHole
}
b.mu.Unlock()

conn, err := f(ctx, i, m)

b.mu.Lock()
defer b.mu.Unlock()

if b.state == blackholeStateBlocked && err == nil {
// If the call succeeds in a blocked state we reset to allowed.
// This is better than slowly accumulating values till we cross the minSuccessFraction
// threshold since a blackhole is a binary property.
b.reset()
return conn, err
}

b.allowed++
if b.allowed > b.minDials {
b.allowed = b.minDials
}

// Discard the earliest outcome
if b.full {
switch b.outcomes[b.outcomeIdx] {
case outcomeSuccess:
b.successes--
case outcomeFailed:
b.failures--
}
}
switch {
case conn != nil:
b.successes++
b.outcomes[b.outcomeIdx] = outcomeSuccess
default:
b.failures++
b.outcomes[b.outcomeIdx] = outcomeFailed
}

b.outcomeIdx++
if b.outcomeIdx == len(b.outcomes) {
b.outcomeIdx = 0
b.full = true
}

b.updateState()
return conn, err
}
}

func (b *blackHoleDetector) isAllowed() bool {
b.requests++
if b.requests == b.n {
b.requests = 0
return true
}
return b.state == blackholeStateAllowed
}

func (b *blackHoleDetector) reset() {
b.allowed = 0
b.successes = 0
b.failures = 0
b.outcomeIdx = 0
b.full = false
b.updateState()
}

func (b *blackHoleDetector) updateState() {
st := b.state
successFraction := 0.0
if b.allowed < b.minDials {
b.state = blackholeStateAllowed
} else {
successFraction = float64(b.successes) / float64(b.successes+b.failures)
if successFraction >= b.minSuccessFraction {
b.state = blackholeStateAllowed
} else {
b.state = blackholeStateBlocked
}
}
if st != b.state {
if b.state == blackholeStateAllowed {
log.Debugf("%s blackHoleDetector state changed to Allowed", b.name)
} else {
log.Debugf("%s blackHoleDetector state changed to Blocked. Success fraction %0.3f", b.name, successFraction)
}
}
}
Loading

0 comments on commit 57de43c

Please sign in to comment.