Skip to content

Commit

Permalink
create startFailureDectector test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroFruit committed Mar 26, 2019
1 parent fa46e20 commit 757f050
Show file tree
Hide file tree
Showing 3 changed files with 250 additions and 132 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ require (
github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a // indirect
github.com/gogo/protobuf v1.1.1
github.com/golang/protobuf v1.2.0
github.com/it-chain/heimdall v0.2.4 // indirect
github.com/it-chain/heimdall v0.2.4
github.com/it-chain/iLogger v0.0.0-20180921150123-3d4855e59818
github.com/rs/xid v1.2.1
github.com/sirupsen/logrus v1.2.0 // indirect
Expand Down
159 changes: 66 additions & 93 deletions swim.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ var ErrInvalidMbrStatsMsgType = errors.New("error invalid mbrStatsMsg type")
var ErrAllSentNackMsg = errors.New("error all of m_k members sent back nack message")
var ErrAllSentInvalidMsg = errors.New("error all of m_k members sent back invalid message")

type ProbeResponse struct {
err error
}

type Config struct {

// The maximum number of times the same piggyback data can be queried
Expand Down Expand Up @@ -327,33 +331,24 @@ func (s *SWIM) ShutDown() {
// 3. After finishing probing all members, Reset memberMap
//
func (s *SWIM) startFailureDetector() {

go func() {
wg := &sync.WaitGroup{}
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
interval := time.Millisecond * time.Duration(s.config.T)
T := time.NewTimer(interval)

for {
T := time.NewTimer(time.Millisecond * time.Duration(s.config.T))
wg.Add(1)

go func() {
// Get copy of current members from memberMap.
members := s.memberMap.GetMembers()
for _, member := range members {
s.probe(ctx, member)
}
members := s.memberMap.GetMembers()
if len(members) == 0 {
<-T.C
T.Reset(interval)
continue
}

wg.Done()
done <- struct{}{}
}()
for _, m := range members {
iLogger.Info(nil, "[swim] start probing ...")
s.probe(m, T)

select {
case <-done:
continue
case <-T.C:
cancel()
wg.Wait()
iLogger.Info(nil, "[swim] done probing !")
T.Reset(interval)
}

// Reset memberMap.
Expand All @@ -375,23 +370,21 @@ func (s *SWIM) startFailureDetector() {
// 3. At the end of T, SWIM checks to see if ack was received from k members, and if there is no message,
// The member(j) is judged to be failed, so check the member(j) as suspected or delete the member(j) from memberMap.
//
func (s *SWIM) probe(ctx context.Context, member Member) {
func (s *SWIM) probe(member Member, timer *time.Timer) {

if member.Status == Dead {
return
}

end := make(chan struct{}, 1)
failed := make(chan struct{}, 1)
end := make(chan ProbeResponse, 1)
defer func() {
close(end)
close(failed)
}()

go func() {
err := s.ping(&member)
if err == nil {
end <- struct{}{}
end <- ProbeResponse{}
return
}
if err != ErrSendTimeout {
Expand All @@ -401,35 +394,30 @@ func (s *SWIM) probe(ctx context.Context, member Member) {

err = s.indirectProbe(&member)
if err != nil {
failed <- struct{}{}
end <- ProbeResponse{err: err}
return
}

end <- struct{}{}
end <- ProbeResponse{}
}()

T := time.NewTimer(time.Millisecond * time.Duration(s.config.T))

select {
// if successfully probed target member, then just return
case <-end:
iLogger.Infof(nil, "probe member [%s] successfully finished", member.ID)
s.awareness.ApplyDelta(-1)
return

// if probe failed, then suspect member
case <-failed:
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
s.awareness.ApplyDelta(1)
s.suspect(&member)
return

// if timed-out, then suspect member
case <-ctx.Done():
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
case <-timer.C:
iLogger.Infof(nil, "[SWIM] probe member [%s] timed out, start suspect", member.ID)
s.awareness.ApplyDelta(1)
s.suspect(&member)
return

// if probe ended with error then suspect member and increase Awareness
// otherwise just decrease Awareness score
case resp := <-end:
if resp.err != nil {
s.awareness.ApplyDelta(1)
s.suspect(&member)
return
}

s.awareness.ApplyDelta(-1)
}
}

Expand All @@ -440,32 +428,39 @@ func (s *SWIM) probe(ctx context.Context, member Member) {
// k-1 member's probe
func (s *SWIM) indirectProbe(target *Member) error {
wg := &sync.WaitGroup{}

returnedNackCounter := 0
invalidResponseCounter := 0

// select k-random member from member map, then sends indirect-ping
k := s.config.K

wg.Add(k)
wg.Add(s.config.K)

// with cancel we can send the signal to goroutines which share
// this @ctx context
ctx, cancel := context.WithCancel(context.Background())

done := make(chan pb.Message, k)
done := make(chan pb.Message, s.config.K)

defer func() {
cancel()
close(done)
}()

kMembers := s.memberMap.SelectKRandomMemberID(k)
for _, member := range kMembers {
go func() {
s.startIndirectPingRunner(ctx, done, member, *target)
wg.Done()
}()
kMembers := s.memberMap.SelectKRandomMemberID(s.config.K)
for _, m := range kMembers {
go func(mediator Member) {
defer wg.Done()

task := func() (interface{}, error) {
return s.indirectPing(mediator, *target)
}

resp := NewTaskRunner(task, ctx).Start()
if resp.payload == nil {
return
}

msg, ok := resp.payload.(pb.Message)
if !ok {
return
}
done <- msg
}(m)
}

// wait until k-random member sends back response, if response message
Expand All @@ -475,6 +470,9 @@ func (s *SWIM) indirectProbe(target *Member) error {
// if all of k-random members sends back Nack message, then indirectProbe
// failed.

returnedNackCounter := 0
invalidResponseCounter := 0

for {
select {
case msg := <-done:
Expand All @@ -488,43 +486,20 @@ func (s *SWIM) indirectProbe(target *Member) error {
return nil
case *pb.Message_Nack:
returnedNackCounter += 1
if returnedNackCounter == k {
if returnedNackCounter == s.config.K {
return ErrAllSentNackMsg
}
default:
iLogger.Errorf(nil, "Invalid message type from [%s]", msg.Address)

invalidResponseCounter += 1
if invalidResponseCounter == k {
if invalidResponseCounter == s.config.K {
return ErrAllSentInvalidMsg
}
}
}
}
}

// startIndirectPingRunner starts indirect-ping goroutine and goroutine which receive
// ACK message first from target sends signal to the other goroutines which share @ctx
func (s *SWIM) startIndirectPingRunner(ctx context.Context, responseChan chan<- pb.Message, mediator, target Member) {
done := make(chan pb.Message)

go func() {
s.indirectPing(mediator, target, func(err error, msg pb.Message) {
if err != nil {
iLogger.Error(nil, err.Error())
done <- pb.Message{}
}
done <- msg
})
}()

select {
case msg := <-done:
responseChan <- msg
case <-ctx.Done():
}
}

// ping ping to member with piggyback message after sending ping message
// the result can be:
// 1. timeout
Expand Down Expand Up @@ -558,11 +533,10 @@ func (s *SWIM) ping(target *Member) error {
// otherwise just return **
// @ctx is for sending cancel signal from outside, when one of k-member successfully
// received ACK message or when in the exceptional situation
func (s *SWIM) indirectPing(mediator, target Member, cb func(err error, msg pb.Message)) {
func (s *SWIM) indirectPing(mediator, target Member) (pb.Message, error) {
stats, err := s.mbrStatsMsgStore.Get()
if err != nil {
cb(err, pb.Message{})
return
return pb.Message{}, err
}

// send indirect-ping message
Expand All @@ -574,13 +548,12 @@ func (s *SWIM) indirectPing(mediator, target Member, cb func(err error, msg pb.M
// when communicating member and target with indirect-ping failed,
// just return.
if err != nil {
cb(err, pb.Message{})
return
return pb.Message{}, err
}
// update piggyback data to store
s.handlePbk(res.PiggyBack)

cb(nil, res)
return res, nil
}

func (s *SWIM) suspect(member *Member) {
Expand Down
Loading

0 comments on commit 757f050

Please sign in to comment.