diff --git a/go.mod b/go.mod index 7cd7c5f..e7046cf 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ require ( github.com/btcsuite/btcutil v0.0.0-20180706230648-ab6388e0c60a // indirect github.com/gogo/protobuf v1.1.1 github.com/golang/protobuf v1.2.0 - github.com/it-chain/heimdall v0.2.4 // indirect + github.com/it-chain/heimdall v0.2.4 github.com/it-chain/iLogger v0.0.0-20180921150123-3d4855e59818 github.com/rs/xid v1.2.1 github.com/sirupsen/logrus v1.2.0 // indirect diff --git a/swim.go b/swim.go index d485b9b..1827bb5 100644 --- a/swim.go +++ b/swim.go @@ -33,6 +33,10 @@ var ErrInvalidMbrStatsMsgType = errors.New("error invalid mbrStatsMsg type") var ErrAllSentNackMsg = errors.New("error all of m_k members sent back nack message") var ErrAllSentInvalidMsg = errors.New("error all of m_k members sent back invalid message") +type ProbeResponse struct { + err error +} + type Config struct { // The maximum number of times the same piggyback data can be queried @@ -327,33 +331,24 @@ func (s *SWIM) ShutDown() { // 3. After finishing probing all members, Reset memberMap // func (s *SWIM) startFailureDetector() { - go func() { - wg := &sync.WaitGroup{} - done := make(chan struct{}) - ctx, cancel := context.WithCancel(context.Background()) + interval := time.Millisecond * time.Duration(s.config.T) + T := time.NewTimer(interval) for { - T := time.NewTimer(time.Millisecond * time.Duration(s.config.T)) - wg.Add(1) - - go func() { - // Get copy of current members from memberMap. - members := s.memberMap.GetMembers() - for _, member := range members { - s.probe(ctx, member) - } + members := s.memberMap.GetMembers() + if len(members) == 0 { + <-T.C + T.Reset(interval) + continue + } - wg.Done() - done <- struct{}{} - }() + for _, m := range members { + iLogger.Info(nil, "[swim] start probing ...") + s.probe(m, T) - select { - case <-done: - continue - case <-T.C: - cancel() - wg.Wait() + iLogger.Info(nil, "[swim] done probing !") + T.Reset(interval) } // Reset memberMap. @@ -375,23 +370,21 @@ func (s *SWIM) startFailureDetector() { // 3. At the end of T, SWIM checks to see if ack was received from k members, and if there is no message, // The member(j) is judged to be failed, so check the member(j) as suspected or delete the member(j) from memberMap. // -func (s *SWIM) probe(ctx context.Context, member Member) { +func (s *SWIM) probe(member Member, timer *time.Timer) { if member.Status == Dead { return } - end := make(chan struct{}, 1) - failed := make(chan struct{}, 1) + end := make(chan ProbeResponse, 1) defer func() { close(end) - close(failed) }() go func() { err := s.ping(&member) if err == nil { - end <- struct{}{} + end <- ProbeResponse{} return } if err != ErrSendTimeout { @@ -401,35 +394,30 @@ func (s *SWIM) probe(ctx context.Context, member Member) { err = s.indirectProbe(&member) if err != nil { - failed <- struct{}{} + end <- ProbeResponse{err: err} return } - end <- struct{}{} + end <- ProbeResponse{} }() - T := time.NewTimer(time.Millisecond * time.Duration(s.config.T)) - select { - // if successfully probed target member, then just return - case <-end: - iLogger.Infof(nil, "probe member [%s] successfully finished", member.ID) - s.awareness.ApplyDelta(-1) - return - - // if probe failed, then suspect member - case <-failed: - iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID) - s.awareness.ApplyDelta(1) - s.suspect(&member) - return - // if timed-out, then suspect member - case <-ctx.Done(): - iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID) + case <-timer.C: + iLogger.Infof(nil, "[SWIM] probe member [%s] timed out, start suspect", member.ID) s.awareness.ApplyDelta(1) s.suspect(&member) - return + + // if probe ended with error then suspect member and increase Awareness + // otherwise just decrease Awareness score + case resp := <-end: + if resp.err != nil { + s.awareness.ApplyDelta(1) + s.suspect(&member) + return + } + + s.awareness.ApplyDelta(-1) } } @@ -440,32 +428,39 @@ func (s *SWIM) probe(ctx context.Context, member Member) { // k-1 member's probe func (s *SWIM) indirectProbe(target *Member) error { wg := &sync.WaitGroup{} - - returnedNackCounter := 0 - invalidResponseCounter := 0 - - // select k-random member from member map, then sends indirect-ping - k := s.config.K - - wg.Add(k) + wg.Add(s.config.K) // with cancel we can send the signal to goroutines which share // this @ctx context ctx, cancel := context.WithCancel(context.Background()) - done := make(chan pb.Message, k) + done := make(chan pb.Message, s.config.K) defer func() { cancel() close(done) }() - kMembers := s.memberMap.SelectKRandomMemberID(k) - for _, member := range kMembers { - go func() { - s.startIndirectPingRunner(ctx, done, member, *target) - wg.Done() - }() + kMembers := s.memberMap.SelectKRandomMemberID(s.config.K) + for _, m := range kMembers { + go func(mediator Member) { + defer wg.Done() + + task := func() (interface{}, error) { + return s.indirectPing(mediator, *target) + } + + resp := NewTaskRunner(task, ctx).Start() + if resp.payload == nil { + return + } + + msg, ok := resp.payload.(pb.Message) + if !ok { + return + } + done <- msg + }(m) } // wait until k-random member sends back response, if response message @@ -475,6 +470,9 @@ func (s *SWIM) indirectProbe(target *Member) error { // if all of k-random members sends back Nack message, then indirectProbe // failed. + returnedNackCounter := 0 + invalidResponseCounter := 0 + for { select { case msg := <-done: @@ -488,14 +486,13 @@ func (s *SWIM) indirectProbe(target *Member) error { return nil case *pb.Message_Nack: returnedNackCounter += 1 - if returnedNackCounter == k { + if returnedNackCounter == s.config.K { return ErrAllSentNackMsg } default: iLogger.Errorf(nil, "Invalid message type from [%s]", msg.Address) - invalidResponseCounter += 1 - if invalidResponseCounter == k { + if invalidResponseCounter == s.config.K { return ErrAllSentInvalidMsg } } @@ -503,28 +500,6 @@ func (s *SWIM) indirectProbe(target *Member) error { } } -// startIndirectPingRunner starts indirect-ping goroutine and goroutine which receive -// ACK message first from target sends signal to the other goroutines which share @ctx -func (s *SWIM) startIndirectPingRunner(ctx context.Context, responseChan chan<- pb.Message, mediator, target Member) { - done := make(chan pb.Message) - - go func() { - s.indirectPing(mediator, target, func(err error, msg pb.Message) { - if err != nil { - iLogger.Error(nil, err.Error()) - done <- pb.Message{} - } - done <- msg - }) - }() - - select { - case msg := <-done: - responseChan <- msg - case <-ctx.Done(): - } -} - // ping ping to member with piggyback message after sending ping message // the result can be: // 1. timeout @@ -558,11 +533,10 @@ func (s *SWIM) ping(target *Member) error { // otherwise just return ** // @ctx is for sending cancel signal from outside, when one of k-member successfully // received ACK message or when in the exceptional situation -func (s *SWIM) indirectPing(mediator, target Member, cb func(err error, msg pb.Message)) { +func (s *SWIM) indirectPing(mediator, target Member) (pb.Message, error) { stats, err := s.mbrStatsMsgStore.Get() if err != nil { - cb(err, pb.Message{}) - return + return pb.Message{}, err } // send indirect-ping message @@ -574,13 +548,12 @@ func (s *SWIM) indirectPing(mediator, target Member, cb func(err error, msg pb.M // when communicating member and target with indirect-ping failed, // just return. if err != nil { - cb(err, pb.Message{}) - return + return pb.Message{}, err } // update piggyback data to store s.handlePbk(res.PiggyBack) - cb(nil, res) + return res, nil } func (s *SWIM) suspect(member *Member) { diff --git a/swim_internal_test.go b/swim_internal_test.go index 94e1767..a38ee68 100644 --- a/swim_internal_test.go +++ b/swim_internal_test.go @@ -905,11 +905,7 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { swim.mbrStatsMsgStore = pbkStore swim.messageEndpoint = messageEndpoint - callback := func(err error, msg pb.Message) { - assert.NoError(t, err) - assert.Equal(t, msg, ack) - } - swim.indirectPing(*mKMember, *mJMember, callback) + swim.indirectPing(*mKMember, *mJMember) } func TestSWIM_indirectPing_When_Response_Failed(t *testing.T) { @@ -976,11 +972,7 @@ func TestSWIM_indirectPing_When_Response_Failed(t *testing.T) { swim.mbrStatsMsgStore = pbkStore swim.messageEndpoint = messageEndpoint - callback := func(err error, msg pb.Message) { - assert.Error(t, err, ErrSendTimeout) - assert.Equal(t, msg, pb.Message{}) - } - swim.indirectPing(*mKMember, *mJMember, callback) + swim.indirectPing(*mKMember, *mJMember) } func TestSWIM_indirectPing_When_One_Of_Other_Member_Sent_Ack(t *testing.T) { @@ -1062,10 +1054,7 @@ func TestSWIM_indirectPing_When_One_Of_Other_Member_Sent_Ack(t *testing.T) { swim.mbrStatsMsgStore = pbkStore swim.messageEndpoint = messageEndpoint - callback := func(err error, msg pb.Message) { - - } - swim.indirectPing(*member, *target, callback) + swim.indirectPing(*member, *target) } func TestSWIM_ping_When_Response_Success(t *testing.T) { @@ -1465,9 +1454,10 @@ func TestSWIM_indirectProbe_When_Some_Member_Sent_Nack_Message(t *testing.T) { // half of the members will send to m_i ACK message i := 1 mJMessageHandler.handleFunc = func(msg pb.Message) { - if i%2 == 0 { + if i == 1 { ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} - mJMessageEndpoint.Send(msg.Address, ack) + err := mJMessageEndpoint.Send(msg.Address, ack) + assert.NoError(t, err) } i++ } @@ -1544,7 +1534,8 @@ func TestSWIM_probe_When_Member_Is_Dead(t *testing.T) { ID: MemberID{ID: "111"}, Status: Dead, } - swim.probe(member) + T := time.NewTimer(time.Second) + swim.probe(member, T) } func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { @@ -1594,7 +1585,7 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { // setup SWIM's message endpoint tc := PacketTransportConfig{ BindAddress: "127.0.0.1", - BindPort: 11162, + BindPort: 13162, } p, _ := NewPacketTransport(&tc) @@ -1610,7 +1601,8 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { go mIMessageEndpoint.Listen() defer mIMessageEndpoint.Shutdown() - swim.probe(mJMember) + T := time.NewTimer(time.Second * 5) + swim.probe(mJMember, T) assert.Equal(t, swim.awareness.score, 1) } @@ -1629,16 +1621,16 @@ func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { // 7. M_I successfully finish indirect-probing // func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { - mJAddress := "127.0.0.1:11161" - mIAddress := "127.0.0.1:11162" - mK1Address := "127.0.0.1:11163" - mK2Address := "127.0.0.1:11164" + mJAddress := "127.0.0.1:13161" + mIAddress := "127.0.0.1:13162" + mK1Address := "127.0.0.1:13163" + mK2Address := "127.0.0.1:13164" // // setup M_J message endpoint, SWIM // mJMessageHandler := &MockMessageHandler{} - mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 13161) // only ignore first message i := 0 @@ -1661,7 +1653,7 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { // config := &Config{ BindAddress: "127.0.0.1", - BindPort: 11162, + BindPort: 13162, K: 2, T: 5000, } @@ -1674,11 +1666,11 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { mI := &Member{ ID: MemberID{ID: "mI"}, Addr: net.ParseIP("127.0.0.1"), - Port: 11162, + Port: 13162, } - m1Member := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11163, Status: Alive} - m2Member := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11164, Status: Alive} + m1Member := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 13163, Status: Alive} + m2Member := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 13164, Status: Alive} mm := NewMemberMap(&SuspicionConfig{}) mm.members[m1Member.ID] = m1Member @@ -1717,12 +1709,12 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { // setup M_K_1 message endpoint // mk1MessageHandler := &MockMessageHandler{} - mk1MessageEndpoint := createMessageEndpoint(t, mk1MessageHandler, time.Second, 11163) + mk1MessageEndpoint := createMessageEndpoint(t, mk1MessageHandler, time.Second, 13163) mk1MessageHandler.handleFunc = func(msg pb.Message) { // test whether received message type is indirect-ping - assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:11161"}) - assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:13161"}) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:13161") assert.Equal(t, msg.Address, mIAddress) ping := createPingMessage(mK1Address, &pb.MbrStatsMsg{}) @@ -1730,6 +1722,7 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { mk1MessageEndpoint.SyncSend(mJAddress, ping) ack := createAckMessage(msg.Id, &pb.MbrStatsMsg{}) // send back to mi + time.Sleep(time.Second) mk1MessageEndpoint.Send(mIAddress, ack) return } @@ -1738,12 +1731,12 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { // setup M_K_2 message endpoint, // mk2MessageHandler := &MockMessageHandler{} - mk2MessageEndpoint := createMessageEndpoint(t, mk2MessageHandler, time.Second, 11164) + mk2MessageEndpoint := createMessageEndpoint(t, mk2MessageHandler, time.Second, 13164) mk2MessageHandler.handleFunc = func(msg pb.Message) { // test whether received message type is indirect-ping - assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:11161"}) - assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:13161"}) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:13161") assert.Equal(t, msg.Address, mIAddress) ping := createPingMessage(mK2Address, &pb.MbrStatsMsg{}) @@ -1766,8 +1759,9 @@ func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { mk2MessageEndpoint.Shutdown() }() - mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} - swim.probe(mJMember) + mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 13161, Status: Alive} + T := time.NewTimer(time.Second * 5) + swim.probe(mJMember, T) assert.Equal(t, swim.awareness.score, 1) } @@ -1903,6 +1897,7 @@ func TestSWIM_probe_When_Target_Not_Respond_To_Indirect_Ping(t *testing.T) { assert.Error(t, err, ErrSendTimeout) nack := createNackMessage(msg.Id, &pb.MbrStatsMsg{}) + // send back to mi mk2MessageEndpoint.Send(mIAddress, nack) return @@ -1920,12 +1915,162 @@ func TestSWIM_probe_When_Target_Not_Respond_To_Indirect_Ping(t *testing.T) { }() mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} - - swim.probe(mJMember) + T := time.NewTimer(time.Second * 5) + swim.probe(mJMember, T) assert.Equal(t, swim.awareness.score, 3) } +func TestSWIM_startFailureDetector_When_TimedOut(t *testing.T) { + // setup M_J member + mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} + mJMessageHandler := &MockMessageHandler{} + + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) + mJMessageHandler.handleFunc = func(msg pb.Message) { + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + + time.Sleep(time.Second * 1) + + mJMessageEndpoint.Send("127.0.0.1:11162", ack) + } + go mJMessageEndpoint.Listen() + defer mJMessageEndpoint.Shutdown() + + // setup M_I + config := &Config{ + BindAddress: "127.0.0.1", + BindPort: 11162, + K: 2, + T: 2000, + } + + pbkStore := &MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return pb.MbrStatsMsg{}, nil + } + + mI := &Member{ + ID: MemberID{ID: "mI"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11162, + } + + mm := NewMemberMap(&SuspicionConfig{}) + mm.members[mJMember.ID] = &mJMember + + awareness := NewAwareness(8) + awareness.ApplyDelta(2) + + swim := &SWIM{} + swim.member = mI + swim.awareness = awareness + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ + BindAddress: "127.0.0.1", + BindPort: 11162, + } + p, _ := NewPacketTransport(&tc) + + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 10, + CallbackCollectInterval: time.Hour, + } + mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = mIMessageEndpoint + + go mIMessageEndpoint.Listen() + defer mIMessageEndpoint.Shutdown() + + T := time.NewTimer(time.Second * 5) + + go swim.startFailureDetector() + + select { + case <-T.C: + } +} + +func TestSWIM_startFailureDetector_When_Success_Probe(t *testing.T) { + // setup M_J member + mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} + mJMessageHandler := &MockMessageHandler{} + + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) + mJMessageHandler.handleFunc = func(msg pb.Message) { + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + + mJMessageEndpoint.Send("127.0.0.1:11162", ack) + } + go mJMessageEndpoint.Listen() + defer mJMessageEndpoint.Shutdown() + + // setup M_I + config := &Config{ + BindAddress: "127.0.0.1", + BindPort: 11162, + K: 2, + T: 3000, + } + + pbkStore := &MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return pb.MbrStatsMsg{}, nil + } + + mI := &Member{ + ID: MemberID{ID: "mI"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11162, + } + + mm := NewMemberMap(&SuspicionConfig{}) + mm.members[mJMember.ID] = &mJMember + + awareness := NewAwareness(8) + awareness.ApplyDelta(2) + + swim := &SWIM{} + swim.member = mI + swim.awareness = awareness + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ + BindAddress: "127.0.0.1", + BindPort: 11162, + } + p, _ := NewPacketTransport(&tc) + + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 10, + CallbackCollectInterval: time.Hour, + } + mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = mIMessageEndpoint + + go mIMessageEndpoint.Listen() + defer mIMessageEndpoint.Shutdown() + + T := time.NewTimer(time.Second * 5) + + go swim.startFailureDetector() + + select { + case <-T.C: + } +} + func createMessageEndpoint(t *testing.T, messageHandler MessageHandler, sendTimeout time.Duration, port int) MessageEndpoint { mConfig := MessageEndpointConfig{ EncryptionEnabled: false,