Skip to content

Commit

Permalink
create Probe test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroFruit committed Mar 26, 2019
1 parent 2d9ff39 commit 4d1bf4e
Show file tree
Hide file tree
Showing 2 changed files with 814 additions and 160 deletions.
132 changes: 65 additions & 67 deletions swim.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package swim
import (
"context"
"errors"
"net"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -306,12 +304,6 @@ func (s *SWIM) ShutDown() {
s.quitFD <- struct{}{}
}

// address returns local-node (myself) address
// TODO: I think there's better way to store local-node(myself) status
func (s *SWIM) address() string {
return net.JoinHostPort(s.config.BindAddress, strconv.Itoa(s.config.BindPort))
}

// Total Failure Detection is performed for each` T`. (ref: https://github.com/DE-labtory/swim/edit/develop/docs/Docs.md)
//
// 1. SWIM randomly selects a member(j) in the memberMap and ping to the member(j).
Expand Down Expand Up @@ -382,7 +374,7 @@ func (s *SWIM) probe(member Member) {
end <- struct{}{}
return
}
if err == ErrSendTimeout {
if err != ErrSendTimeout {
iLogger.Errorf(nil, "Error occurred when ping failed [%s]", err.Error())
return
}
Expand All @@ -398,25 +390,23 @@ func (s *SWIM) probe(member Member) {

T := time.NewTimer(time.Millisecond * time.Duration(s.config.T))

for {
select {
// if successfully probed target member, then just return
case <-end:
iLogger.Infof(nil, "probe member [%s] successfully finished", member.ID)
return
select {
// if successfully probed target member, then just return
case <-end:
iLogger.Infof(nil, "probe member [%s] successfully finished", member.ID)
return

// if probe failed, then suspect member
case <-failed:
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
s.suspect(&member)
return
// if probe failed, then suspect member
case <-failed:
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
s.suspect(&member)
return

// if timed-out, then suspect member
case <-T.C:
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
s.suspect(&member)
return
}
// if timed-out, then suspect member
case <-T.C:
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
s.suspect(&member)
return
}
}

Expand All @@ -431,25 +421,28 @@ func (s *SWIM) indirectProbe(target *Member) error {
returnedNackCounter := 0
invalidResponseCounter := 0

// select k-random member from member map, then sends indirect-ping
k := s.config.K

wg.Add(k)

// with cancel we can send the signal to goroutines which share
// this @ctx context
ctx, cancel := context.WithCancel(context.Background())

indResponse := make(chan pb.Message)
done := make(chan pb.Message, k)

defer func() {
cancel()
close(indResponse)
close(done)
}()

// select k-random member from member map, then sends indirect-ping
k := s.config.K

wg.Add(k)

kMembers := s.memberMap.SelectKRandomMemberID(k)
for _, member := range kMembers {
go s.indirectPing(ctx, wg, &member, target, indResponse)
go func() {
s.startIndirectPingRunner(ctx, done, member, *target)
wg.Done()
}()
}

// wait until k-random member sends back response, if response message
Expand All @@ -461,7 +454,7 @@ func (s *SWIM) indirectProbe(target *Member) error {

for {
select {
case msg := <-indResponse:
case msg := <-done:
switch msg.Payload.(type) {
case *pb.Message_Ack:
// if one of members received ACK message then send the cancel
Expand All @@ -487,6 +480,28 @@ func (s *SWIM) indirectProbe(target *Member) error {
}
}

// startIndirectPingRunner starts indirect-ping goroutine and goroutine which receive
// ACK message first from target sends signal to the other goroutines which share @ctx
func (s *SWIM) startIndirectPingRunner(ctx context.Context, responseChan chan<- pb.Message, mediator, target Member) {
done := make(chan pb.Message)

go func() {
s.indirectPing(mediator, target, func(err error, msg pb.Message) {
if err != nil {
iLogger.Error(nil, err.Error())
done <- pb.Message{}
}
done <- msg
})
}()

select {
case msg := <-done:
responseChan <- msg
case <-ctx.Done():
}
}

// ping ping to member with piggyback message after sending ping message
// the result can be:
// 1. timeout
Expand All @@ -502,7 +517,7 @@ func (s *SWIM) ping(target *Member) error {

// send ping message
addr := target.Address()
ping := createPingMessage(s.address(), &stats)
ping := createPingMessage(s.member.Address(), &stats)

res, err := s.messageEndpoint.SyncSend(addr, ping)
if err != nil {
Expand All @@ -520,44 +535,29 @@ func (s *SWIM) ping(target *Member) error {
// otherwise just return **
// @ctx is for sending cancel signal from outside, when one of k-member successfully
// received ACK message or when in the exceptional situation
func (s *SWIM) indirectPing(ctx context.Context, wg *sync.WaitGroup, mediator, target *Member, indSucc chan pb.Message) {
defer wg.Done()

func (s *SWIM) indirectPing(mediator, target Member, cb func(err error, msg pb.Message)) {
stats, err := s.mbrStatsMsgStore.Get()
if err != nil {
iLogger.Error(nil, err.Error())
cb(err, pb.Message{})
return
}

done := make(chan pb.Message)

// send indirect-ping message
addr := mediator.Address()
ind := createIndMessage(s.address(), target.Address(), &stats)

go func() {
res, err := s.messageEndpoint.SyncSend(addr, ind)

// when communicating member and target with indirect-ping failed,
// just return.
if err != nil {
iLogger.Error(nil, err.Error())
return
}
// update piggyback data to store
s.handlePbk(res.PiggyBack)
done <- res
}()
ind := createIndMessage(s.member.Address(), target.Address(), &stats)

select {
case res := <-done:
indSucc <- res
return
res, err := s.messageEndpoint.SyncSend(addr, ind)

// when one of the k-member received ACK message then
// cancel my indirect-probe
case <-ctx.Done():
// when communicating member and target with indirect-ping failed,
// just return.
if err != nil {
cb(err, pb.Message{})
return
}
// update piggyback data to store
s.handlePbk(res.PiggyBack)

cb(nil, res)
}

func (s *SWIM) suspect(member *Member) {
Expand Down Expand Up @@ -605,7 +605,6 @@ func (s *SWIM) handle(msg pb.Message) {

// handle piggyback related to member status
func (s *SWIM) handlePbk(piggyBack *pb.PiggyBack) {

mbrStatsMsg := piggyBack.MbrStatsMsg

// Check if piggyback message changes memberMap.
Expand Down Expand Up @@ -649,13 +648,12 @@ func (s *SWIM) handleIndirectPing(msg pb.Message) {
// address of indirect-ping's target
targetAddr := msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target

ping := createPingMessage(srcAddr, &mbrStatsMsg)
ping := createPingMessage(s.member.Address(), &mbrStatsMsg)

// first send the ping to target member, if target member could not send-back
// ack message for whatever reason send nack message to source member,
// if successfully received ack message from target, then send back ack message
// to source member

if _, err := s.messageEndpoint.SyncSend(targetAddr, ping); err != nil {
nack := createNackMessage(id, &mbrStatsMsg)
if err := s.messageEndpoint.Send(srcAddr, nack); err != nil {
Expand Down
Loading

0 comments on commit 4d1bf4e

Please sign in to comment.