Skip to content

Commit

Permalink
move timeout logic to failure detector
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroFruit committed Mar 26, 2019
1 parent 5ad896d commit fa46e20
Showing 1 changed file with 26 additions and 6 deletions.
32 changes: 26 additions & 6 deletions swim.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,31 @@ func (s *SWIM) ShutDown() {
func (s *SWIM) startFailureDetector() {

go func() {
wg := &sync.WaitGroup{}
done := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())

for {
// Get copy of current members from memberMap.
members := s.memberMap.GetMembers()
for _, member := range members {
s.probe(member)
T := time.NewTimer(time.Millisecond * time.Duration(s.config.T))
wg.Add(1)

go func() {
// Get copy of current members from memberMap.
members := s.memberMap.GetMembers()
for _, member := range members {
s.probe(ctx, member)
}

wg.Done()
done <- struct{}{}
}()

select {
case <-done:
continue
case <-T.C:
cancel()
wg.Wait()
}

// Reset memberMap.
Expand All @@ -355,7 +375,7 @@ func (s *SWIM) startFailureDetector() {
// 3. At the end of T, SWIM checks to see if ack was received from k members, and if there is no message,
// The member(j) is judged to be failed, so check the member(j) as suspected or delete the member(j) from memberMap.
//
func (s *SWIM) probe(member Member) {
func (s *SWIM) probe(ctx context.Context, member Member) {

if member.Status == Dead {
return
Expand Down Expand Up @@ -405,7 +425,7 @@ func (s *SWIM) probe(member Member) {
return

// if timed-out, then suspect member
case <-T.C:
case <-ctx.Done():
iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID)
s.awareness.ApplyDelta(1)
s.suspect(&member)
Expand Down

0 comments on commit fa46e20

Please sign in to comment.