diff --git a/swim.go b/swim.go index a380943..2d17b1e 100644 --- a/swim.go +++ b/swim.go @@ -19,8 +19,6 @@ package swim import ( "context" "errors" - "net" - "strconv" "sync" "time" @@ -306,12 +304,6 @@ func (s *SWIM) ShutDown() { s.quitFD <- struct{}{} } -// address returns local-node (myself) address -// TODO: I think there's better way to store local-node(myself) status -func (s *SWIM) address() string { - return net.JoinHostPort(s.config.BindAddress, strconv.Itoa(s.config.BindPort)) -} - // Total Failure Detection is performed for each` T`. (ref: https://github.com/DE-labtory/swim/edit/develop/docs/Docs.md) // // 1. SWIM randomly selects a member(j) in the memberMap and ping to the member(j). @@ -382,7 +374,7 @@ func (s *SWIM) probe(member Member) { end <- struct{}{} return } - if err == ErrSendTimeout { + if err != ErrSendTimeout { iLogger.Errorf(nil, "Error occurred when ping failed [%s]", err.Error()) return } @@ -398,25 +390,23 @@ func (s *SWIM) probe(member Member) { T := time.NewTimer(time.Millisecond * time.Duration(s.config.T)) - for { - select { - // if successfully probed target member, then just return - case <-end: - iLogger.Infof(nil, "probe member [%s] successfully finished", member.ID) - return + select { + // if successfully probed target member, then just return + case <-end: + iLogger.Infof(nil, "probe member [%s] successfully finished", member.ID) + return - // if probe failed, then suspect member - case <-failed: - iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID) - s.suspect(&member) - return + // if probe failed, then suspect member + case <-failed: + iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID) + s.suspect(&member) + return - // if timed-out, then suspect member - case <-T.C: - iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID) - s.suspect(&member) - return - } + // if timed-out, then suspect member + case <-T.C: + iLogger.Infof(nil, "probe member [%s] failed, start suspect", member.ID) + s.suspect(&member) + return } } @@ -431,25 +421,28 @@ func (s *SWIM) indirectProbe(target *Member) error { returnedNackCounter := 0 invalidResponseCounter := 0 + // select k-random member from member map, then sends indirect-ping + k := s.config.K + + wg.Add(k) + // with cancel we can send the signal to goroutines which share // this @ctx context ctx, cancel := context.WithCancel(context.Background()) - indResponse := make(chan pb.Message) + done := make(chan pb.Message, k) defer func() { cancel() - close(indResponse) + close(done) }() - // select k-random member from member map, then sends indirect-ping - k := s.config.K - - wg.Add(k) - kMembers := s.memberMap.SelectKRandomMemberID(k) for _, member := range kMembers { - go s.indirectPing(ctx, wg, &member, target, indResponse) + go func() { + s.startIndirectPingRunner(ctx, done, member, *target) + wg.Done() + }() } // wait until k-random member sends back response, if response message @@ -461,7 +454,7 @@ func (s *SWIM) indirectProbe(target *Member) error { for { select { - case msg := <-indResponse: + case msg := <-done: switch msg.Payload.(type) { case *pb.Message_Ack: // if one of members received ACK message then send the cancel @@ -487,6 +480,28 @@ func (s *SWIM) indirectProbe(target *Member) error { } } +// startIndirectPingRunner starts indirect-ping goroutine and goroutine which receive +// ACK message first from target sends signal to the other goroutines which share @ctx +func (s *SWIM) startIndirectPingRunner(ctx context.Context, responseChan chan<- pb.Message, mediator, target Member) { + done := make(chan pb.Message) + + go func() { + s.indirectPing(mediator, target, func(err error, msg pb.Message) { + if err != nil { + iLogger.Error(nil, err.Error()) + done <- pb.Message{} + } + done <- msg + }) + }() + + select { + case msg := <-done: + responseChan <- msg + case <-ctx.Done(): + } +} + // ping ping to member with piggyback message after sending ping message // the result can be: // 1. timeout @@ -502,7 +517,7 @@ func (s *SWIM) ping(target *Member) error { // send ping message addr := target.Address() - ping := createPingMessage(s.address(), &stats) + ping := createPingMessage(s.member.Address(), &stats) res, err := s.messageEndpoint.SyncSend(addr, ping) if err != nil { @@ -520,44 +535,29 @@ func (s *SWIM) ping(target *Member) error { // otherwise just return ** // @ctx is for sending cancel signal from outside, when one of k-member successfully // received ACK message or when in the exceptional situation -func (s *SWIM) indirectPing(ctx context.Context, wg *sync.WaitGroup, mediator, target *Member, indSucc chan pb.Message) { - defer wg.Done() - +func (s *SWIM) indirectPing(mediator, target Member, cb func(err error, msg pb.Message)) { stats, err := s.mbrStatsMsgStore.Get() if err != nil { - iLogger.Error(nil, err.Error()) + cb(err, pb.Message{}) + return } - done := make(chan pb.Message) - // send indirect-ping message addr := mediator.Address() - ind := createIndMessage(s.address(), target.Address(), &stats) - - go func() { - res, err := s.messageEndpoint.SyncSend(addr, ind) - - // when communicating member and target with indirect-ping failed, - // just return. - if err != nil { - iLogger.Error(nil, err.Error()) - return - } - // update piggyback data to store - s.handlePbk(res.PiggyBack) - done <- res - }() + ind := createIndMessage(s.member.Address(), target.Address(), &stats) - select { - case res := <-done: - indSucc <- res - return + res, err := s.messageEndpoint.SyncSend(addr, ind) - // when one of the k-member received ACK message then - // cancel my indirect-probe - case <-ctx.Done(): + // when communicating member and target with indirect-ping failed, + // just return. + if err != nil { + cb(err, pb.Message{}) return } + // update piggyback data to store + s.handlePbk(res.PiggyBack) + + cb(nil, res) } func (s *SWIM) suspect(member *Member) { @@ -605,7 +605,6 @@ func (s *SWIM) handle(msg pb.Message) { // handle piggyback related to member status func (s *SWIM) handlePbk(piggyBack *pb.PiggyBack) { - mbrStatsMsg := piggyBack.MbrStatsMsg // Check if piggyback message changes memberMap. @@ -649,13 +648,12 @@ func (s *SWIM) handleIndirectPing(msg pb.Message) { // address of indirect-ping's target targetAddr := msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target - ping := createPingMessage(srcAddr, &mbrStatsMsg) + ping := createPingMessage(s.member.Address(), &mbrStatsMsg) // first send the ping to target member, if target member could not send-back // ack message for whatever reason send nack message to source member, // if successfully received ack message from target, then send back ack message // to source member - if _, err := s.messageEndpoint.SyncSend(targetAddr, ping); err != nil { nack := createNackMessage(id, &mbrStatsMsg) if err := s.messageEndpoint.Send(srcAddr, nack); err != nil { diff --git a/swim_internal_test.go b/swim_internal_test.go index e753784..cae86dd 100644 --- a/swim_internal_test.go +++ b/swim_internal_test.go @@ -17,7 +17,6 @@ package swim import ( - "context" "net" "sync" "testing" @@ -456,11 +455,17 @@ func TestSWIM_handleIndirectPing(t *testing.T) { }, nil } + self := &Member{ + ID: MemberID{ID: "mi"}, + Addr: net.ParseIP("127.0.0.1"), + Port: uint16(11146), + } config := &Config{ BindAddress: "127.0.0.1", - BindPort: 11140, + BindPort: 11146, } swim := SWIM{} + swim.member = self swim.config = config mIMessageHandler := MockMessageHandler{} @@ -541,20 +546,27 @@ func TestSWIM_handleIndirectPing_Target_Timeout(t *testing.T) { }, nil } + self := &Member{ + ID: MemberID{ID: "mi"}, + Addr: net.ParseIP("127.0.0.1"), + Port: uint16(11179), + } config := &Config{ BindAddress: "127.0.0.1", - BindPort: 11140, + BindPort: 11179, } + swim := SWIM{} + swim.member = self swim.config = config mIMessageHandler := MockMessageHandler{} mJMessageHandler := MockMessageHandler{} - mK := createMessageEndpoint(t, &swim, time.Second, 11148) + mK := createMessageEndpoint(t, &swim, time.Second, 11178) // source should have larger send timeout, because source should give mediator // enough time to ping to target - mI := createMessageEndpoint(t, &mIMessageHandler, time.Second*3, 11149) - mJ := createMessageEndpoint(t, &mJMessageHandler, time.Second, 11150) + mI := createMessageEndpoint(t, &mIMessageHandler, time.Second*3, 11179) + mJ := createMessageEndpoint(t, &mJMessageHandler, time.Second, 11180) swim.messageEndpoint = mK swim.mbrStatsMsgStore = mbrStatsMsgStore @@ -573,11 +585,11 @@ func TestSWIM_handleIndirectPing_Target_Timeout(t *testing.T) { ind := pb.Message{ Id: id, // address of source member - Address: "127.0.0.1:11149", + Address: "127.0.0.1:11179", Payload: &pb.Message_IndirectPing{ IndirectPing: &pb.IndirectPing{ // target address - Target: "127.0.0.1:11150", + Target: "127.0.0.1:11180", }, }, PiggyBack: &pb.PiggyBack{ @@ -601,7 +613,7 @@ func TestSWIM_handleIndirectPing_Target_Timeout(t *testing.T) { // DO NOT ANYTHING: do not response back to m } - resp, err := mI.SyncSend("127.0.0.1:11148", ind) + resp, err := mI.SyncSend("127.0.0.1:11178", ind) assert.NoError(t, err) assert.NotNil(t, resp.Payload.(*pb.Message_Nack)) assert.Equal(t, resp.Id, id) @@ -831,6 +843,23 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { return *stats.MbrStatsMsg, nil } + ack := pb.Message{ + Id: "responseID", + Address: mJAddr, + Payload: &pb.Message_Ack{ + Ack: &pb.Ack{ + Payload: "ack-payload", + }, + }, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Type: pb.MbrStatsMsg_Alive, + Id: "555", + Incarnation: uint32(345), + Address: "pbk-addr2", + }, + }, + } messageEndpoint := MockMessageEndpoint{} messageEndpoint.SyncSendFunc = func(addr string, msg pb.Message) (pb.Message, error) { // this addr should be mediator address @@ -841,23 +870,7 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { assert.Equal(t, msg.PiggyBack.MbrStatsMsg, stats) assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "3.4.5.6:22222") - return pb.Message{ - Id: "responseID", - Address: mJAddr, - Payload: &pb.Message_Ack{ - Ack: &pb.Ack{ - Payload: "ack-payload", - }, - }, - PiggyBack: &pb.PiggyBack{ - MbrStatsMsg: &pb.MbrStatsMsg{ - Type: pb.MbrStatsMsg_Alive, - Id: "555", - Incarnation: uint32(345), - Address: "pbk-addr2", - }, - }, - }, nil + return ack, nil } mIMember := &Member{ @@ -892,18 +905,11 @@ func TestSWIM_indirectPing_When_Response_Success(t *testing.T) { swim.mbrStatsMsgStore = pbkStore swim.messageEndpoint = messageEndpoint - ctx, _ := context.WithCancel(context.Background()) - wg := &sync.WaitGroup{} - wg.Add(1) - - go swim.indirectPing(ctx, wg, mKMember, mJMember, indSucc) - - select { - case msg := <-indSucc: - assert.Equal(t, msg.Id, "responseID") - assert.Equal(t, msg.Address, mJAddr) - assert.Equal(t, getAckPayload(msg), "ack-payload") + callback := func(err error, msg pb.Message) { + assert.NoError(t, err) + assert.Equal(t, msg, ack) } + swim.indirectPing(*mKMember, *mJMember, callback) } func TestSWIM_indirectPing_When_Response_Failed(t *testing.T) { @@ -970,20 +976,96 @@ func TestSWIM_indirectPing_When_Response_Failed(t *testing.T) { swim.mbrStatsMsgStore = pbkStore swim.messageEndpoint = messageEndpoint - ctx, _ := context.WithCancel(context.Background()) - wg := &sync.WaitGroup{} - wg.Add(1) + callback := func(err error, msg pb.Message) { + assert.Error(t, err, ErrSendTimeout) + assert.Equal(t, msg, pb.Message{}) + } + swim.indirectPing(*mKMember, *mJMember, callback) +} + +func TestSWIM_indirectPing_When_One_Of_Other_Member_Sent_Ack(t *testing.T) { + pbk := pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Type: pb.MbrStatsMsg_Alive, + Id: "123", + Incarnation: uint32(123), + Address: "pbk-addr1", + }, + } + pbkStore := MockMbrStatsMsgStore{} + pbkStore.PushFunc = func(pbk pb.MbrStatsMsg) {} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return *pbk.MbrStatsMsg, nil + } - go swim.indirectPing(ctx, wg, mKMember, mJMember, indSucc) + messageEndpoint := MockMessageEndpoint{} + messageEndpoint.SyncSendFunc = func(addr string, msg pb.Message) (pb.Message, error) { + // this addr should be mediator address + assert.Equal(t, addr, "1.2.3.4:11111") - T := time.NewTimer(time.Second) + // msg.Address should be local-node address + assert.Equal(t, msg.Address, "9.8.7.6:33333") + assert.Equal(t, msg.PiggyBack, &pbk) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "3.4.5.6:22222") + + // sleep 1 seconds + time.Sleep(time.Second) + + return pb.Message{ + Id: "responseID", + Address: "3.4.5.6:22222", + Payload: &pb.Message_Ack{ + Ack: &pb.Ack{ + Payload: "ack-payload", + }, + }, + PiggyBack: &pb.PiggyBack{ + MbrStatsMsg: &pb.MbrStatsMsg{ + Type: pb.MbrStatsMsg_Alive, + Id: "555", + Incarnation: uint32(345), + Address: "pbk-addr2", + }, + }, + }, nil + } + + self := &Member{ + ID: MemberID{ID: "memberID"}, + Addr: net.ParseIP("9.8.7.6"), + Port: uint16(33333), + } + member := &Member{ + ID: MemberID{ID: "memberID"}, + Addr: net.ParseIP("1.2.3.4"), + Port: uint16(11111), + } + target := &Member{ + ID: MemberID{ID: "targetID"}, + Addr: net.ParseIP("3.4.5.6"), + Port: uint16(22222), + } + + indSucc := make(chan pb.Message) + defer func() { + close(indSucc) + }() + + config := &Config{ + BindAddress: "9.8.7.6", + BindPort: 33333, + } + + swim := &SWIM{} + swim.member = self + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.messageEndpoint = messageEndpoint + + callback := func(err error, msg pb.Message) { - select { - case <-indSucc: - panic("This shouldn't be called") - case <-T.C: - return } + swim.indirectPing(*member, *target, callback) } func TestSWIM_ping_When_Response_Success(t *testing.T) { @@ -1122,56 +1204,251 @@ func TestSWIM_ping_When_Response_Failed(t *testing.T) { // test when one of k-members response with other than ACK or NACK func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { - m1 := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11151, Status: Alive} - m2 := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11152, Status: Alive} + mIAddr := "127.0.0.1:11184" + mJAddr := "127.0.0.1:11183" + mK1Addr := "127.0.0.1:11181" + mK2Addr := "127.0.0.1:11182" + + // Setup M_K1 + mK1MessageHandler := &MockMessageHandler{} + mK1MessageEndpoint := createMessageEndpoint(t, mK1MessageHandler, time.Second*2, 11181) + mK1MessageHandler.handleFunc = func(msg pb.Message) { + ping := createPingMessage(mK1Addr, &pb.MbrStatsMsg{}) + _, err := mK1MessageEndpoint.SyncSend(mJAddr, ping) + assert.NoError(t, err) + + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + err = mK1MessageEndpoint.Send(mIAddr, ack) + assert.NoError(t, err) + } + + // Setup M_K2 + mK2MessageHandler := &MockMessageHandler{} + mK2MessageEndpoint := createMessageEndpoint(t, mK2MessageHandler, time.Second*2, 11182) + mK2MessageHandler.handleFunc = func(msg pb.Message) { + ping := createPingMessage(mK2Addr, &pb.MbrStatsMsg{}) + mK2MessageEndpoint.SyncSend(mJAddr, ping) + + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + mK2MessageEndpoint.Send(mIAddr, ack) + } + + go mK1MessageEndpoint.Listen() + go mK2MessageEndpoint.Listen() + defer func() { + mK1MessageEndpoint.Shutdown() + mK2MessageEndpoint.Shutdown() + }() + + mJMessageHandler := &MockMessageHandler{} + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11183) + go mJMessageEndpoint.Listen() + defer mJMessageEndpoint.Shutdown() + + mJMessageHandler.handleFunc = func(msg pb.Message) { + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + mJMessageEndpoint.Send(msg.Address, ack) + } + mJ := &Member{ + ID: MemberID{ID: "mJ"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11183, + } + + // setup local-node + mI := &Member{ + ID: MemberID{ID: "mJ"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11184, + } + config := &Config{ + BindAddress: "127.0.0.1", + BindPort: 11184, + K: 2, + } + + pbkStore := MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return pb.MbrStatsMsg{}, nil + } + + m1 := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11181, Status: Alive} + m2 := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11182, Status: Alive} mm := NewMemberMap(&SuspicionConfig{}) mm.members[m1.ID] = m1 mm.members[m2.ID] = m2 + swim := &SWIM{} + swim.member = mI + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ + BindAddress: "127.0.0.1", + BindPort: 11184, + } + p, _ := NewPacketTransport(&tc) + + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 3, + CallbackCollectInterval: time.Hour, + } + messageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = messageEndpoint + + go messageEndpoint.Listen() + defer messageEndpoint.Shutdown() + + err := swim.indirectProbe(mJ) + assert.NoError(t, err) +} + +func TestSWIM_indirectProbe_When_All_Sent_Nack_Message(t *testing.T) { + mIAddr := "127.0.0.1:11394" + mK1Addr := "127.0.0.1:11391" + mK2Addr := "127.0.0.1:11392" + mJAddr := "127.0.0.1:11393" + // Setup M_K1 - mK1pbkStore := MockMbrStatsMsgStore{} - mK1pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { - return pb.MbrStatsMsg{}, nil + mK1MessageHandler := &MockMessageHandler{} + mK1MessageEndpoint := createMessageEndpoint(t, mK1MessageHandler, time.Second*2, 11391) + mK1MessageHandler.handleFunc = func(msg pb.Message) { + ping := createPingMessage(mK1Addr, &pb.MbrStatsMsg{}) + _, err := mK1MessageEndpoint.SyncSend(mJAddr, ping) + assert.Error(t, err, ErrSendTimeout) + + nack := pb.Message{Id: msg.Id, Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, PiggyBack: &pb.PiggyBack{}} + err = mK1MessageEndpoint.Send(mIAddr, nack) + assert.NoError(t, err) } - mK1 := &Member{ + + // Setup M_K2 + mK2MessageHandler := &MockMessageHandler{} + mK2MessageEndpoint := createMessageEndpoint(t, mK2MessageHandler, time.Second*2, 11392) + mK2MessageHandler.handleFunc = func(msg pb.Message) { + ping := createPingMessage(mK2Addr, &pb.MbrStatsMsg{}) + mK2MessageEndpoint.SyncSend(mJAddr, ping) + + nack := pb.Message{Id: msg.Id, Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, PiggyBack: &pb.PiggyBack{}} + mK2MessageEndpoint.Send(mIAddr, nack) + } + + go mK1MessageEndpoint.Listen() + go mK2MessageEndpoint.Listen() + defer func() { + mK1MessageEndpoint.Shutdown() + mK2MessageEndpoint.Shutdown() + }() + + // setup M_J + mJMessageHandler := &MockMessageHandler{} + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11393) + go mJMessageEndpoint.Listen() + defer mJMessageEndpoint.Shutdown() + + mJMessageHandler.handleFunc = func(msg pb.Message) { + // m_j send nothing + } + mJ := &Member{ ID: MemberID{ID: "mJ"}, Addr: net.ParseIP("127.0.0.1"), - Port: 11151, + Port: 11393, } - mK1Config := &Config{ + + // setup local-node + + config := &Config{ BindAddress: "127.0.0.1", - BindPort: 11151, + BindPort: 11394, + K: 2, + T: 5000, } - mK1SWIM := &SWIM{} - mK1SWIM.member = mK1 - mK1SWIM.config = mK1Config - mK1SWIM.mbrStatsMsgStore = mK1pbkStore - mK1MessageEndpoint := createMessageEndpoint(t, mK1SWIM, time.Second*2, 11151) - mK1SWIM.messageEndpoint = mK1MessageEndpoint - - // Setup M_K2 - mK2pbkStore := MockMbrStatsMsgStore{} - mK1pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + pbkStore := MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { return pb.MbrStatsMsg{}, nil } - mK2 := &Member{ - ID: MemberID{ID: "mJ"}, + + m1 := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11391, Status: Alive} + m2 := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11392, Status: Alive} + + mm := NewMemberMap(&SuspicionConfig{}) + mm.members[m1.ID] = m1 + mm.members[m2.ID] = m2 + + mI := &Member{ + ID: MemberID{ID: "mIAA"}, Addr: net.ParseIP("127.0.0.1"), - Port: 11152, + Port: 11394, } - mK2config := &Config{ + + swim := &SWIM{} + swim.member = mI + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ BindAddress: "127.0.0.1", - BindPort: 11152, + BindPort: 11394, } - mK2SWIM := &SWIM{} - mK2SWIM.member = mK2 - mK2SWIM.config = mK2config - mK2SWIM.mbrStatsMsgStore = mK2pbkStore + p, _ := NewPacketTransport(&tc) - mK2MessageEndpoint := createMessageEndpoint(t, mK1SWIM, time.Second*2, 11152) - mK2SWIM.messageEndpoint = mK2MessageEndpoint + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 3, + CallbackCollectInterval: time.Hour, + } + messageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = messageEndpoint + + end, indFailed := make(chan struct{}), make(chan struct{}) + defer func() { + close(end) + close(indFailed) + }() + + go messageEndpoint.Listen() + defer messageEndpoint.Shutdown() + + err := swim.indirectProbe(mJ) + assert.Error(t, err, ErrAllSentNackMsg) +} + +func TestSWIM_indirectProbe_When_Some_Member_Sent_Nack_Message(t *testing.T) { + mIAddr := "127.0.0.1:11254" + mK1Addr := "127.0.0.1:11251" + mK2Addr := "127.0.0.1:11252" + mJAddr := "127.0.0.1:11253" + + // Setup M_K1 + mK1MessageHandler := &MockMessageHandler{} + mK1MessageEndpoint := createMessageEndpoint(t, mK1MessageHandler, time.Second*2, 11251) + mK1MessageHandler.handleFunc = func(msg pb.Message) { + ping := createPingMessage(mK1Addr, &pb.MbrStatsMsg{}) + mK1MessageEndpoint.SyncSend(mJAddr, ping) + + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + mK1MessageEndpoint.Send(mIAddr, ack) + } + + // Setup M_K2 + mK2MessageHandler := &MockMessageHandler{} + mK2MessageEndpoint := createMessageEndpoint(t, mK2MessageHandler, time.Second*2, 11252) + mK2MessageHandler.handleFunc = func(msg pb.Message) { + ping := createPingMessage(mK2Addr, &pb.MbrStatsMsg{}) + mK2MessageEndpoint.SyncSend(mJAddr, ping) + + nack := pb.Message{Id: msg.Id, Payload: &pb.Message_Nack{Nack: &pb.Nack{}}, PiggyBack: &pb.PiggyBack{}} + mK2MessageEndpoint.Send(mIAddr, nack) + } go mK1MessageEndpoint.Listen() go mK2MessageEndpoint.Listen() @@ -1181,29 +1458,30 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { }() mJMessageHandler := &MockMessageHandler{} - mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11153) + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11253) go mJMessageEndpoint.Listen() defer mJMessageEndpoint.Shutdown() + // half of the members will send to m_i ACK message + i := 1 mJMessageHandler.handleFunc = func(msg pb.Message) { - ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} - mJMessageEndpoint.Send(msg.Address, ack) + if i%2 == 0 { + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + mJMessageEndpoint.Send(msg.Address, ack) + } + i++ } mJ := &Member{ ID: MemberID{ID: "mJ"}, Addr: net.ParseIP("127.0.0.1"), - Port: 11153, + Port: 11253, } // setup local-node - mI := &Member{ - ID: MemberID{ID: "mJ"}, - Addr: net.ParseIP("127.0.0.1"), - Port: 11154, - } + config := &Config{ BindAddress: "127.0.0.1", - BindPort: 11154, + BindPort: 11254, K: 2, } @@ -1212,6 +1490,19 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { return pb.MbrStatsMsg{}, nil } + m1 := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11251, Status: Alive} + m2 := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11252, Status: Alive} + + mm := NewMemberMap(&SuspicionConfig{}) + mm.members[m1.ID] = m1 + mm.members[m2.ID] = m2 + + mI := &Member{ + ID: MemberID{ID: "mI"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11254, + } + swim := &SWIM{} swim.member = mI swim.config = config @@ -1221,12 +1512,13 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { // setup SWIM's message endpoint tc := PacketTransportConfig{ BindAddress: "127.0.0.1", - BindPort: 11154, + BindPort: 11254, } p, _ := NewPacketTransport(&tc) meConfig := MessageEndpointConfig{ EncryptionEnabled: false, + SendTimeout: time.Second * 3, CallbackCollectInterval: time.Hour, } messageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) @@ -1242,14 +1534,378 @@ func TestSWIM_indirectProbe_When_Successfully_Probed(t *testing.T) { go messageEndpoint.Listen() defer messageEndpoint.Shutdown() - go swim.indirectProbe(mJ) + err := swim.indirectProbe(mJ) + assert.NoError(t, err) +} + +func TestSWIM_probe_When_Member_Is_Dead(t *testing.T) { + swim := &SWIM{} + member := Member{ + ID: MemberID{ID: "111"}, + Status: Dead, + } + swim.probe(member) +} + +func TestSWIM_probe_When_Target_Respond_To_Ping(t *testing.T) { + // setup M_J member + mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} + mJMessageHandler := &MockMessageHandler{} + + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) + mJMessageHandler.handleFunc = func(msg pb.Message) { + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + mJMessageEndpoint.Send("127.0.0.1:11162", ack) + } + go mJMessageEndpoint.Listen() + defer mJMessageEndpoint.Shutdown() + + // setup M_I member + config := &Config{ + BindAddress: "127.0.0.1", + BindPort: 11162, + K: 2, + T: 5000, + } + + pbkStore := &MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return pb.MbrStatsMsg{}, nil + } + + mm := NewMemberMap(&SuspicionConfig{}) - select { - case <-end: + mI := &Member{ + ID: MemberID{ID: "mI"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11162, + } + + swim := &SWIM{} + swim.member = mI + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ + BindAddress: "127.0.0.1", + BindPort: 11162, + } + p, _ := NewPacketTransport(&tc) + + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 3, + CallbackCollectInterval: time.Hour, + } + mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = mIMessageEndpoint + + go mIMessageEndpoint.Listen() + defer mIMessageEndpoint.Shutdown() + + swim.probe(mJMember) +} + +// This test case is for testing if we failed at ping to M_J, then indirect-probing is execute +// with M_J respond ACK message +// +// Test case scenario: +// +// 1. M_I send ping message to M_J +// 2. M_J ignore ping message +// 3. M_I start indirect-probe, so send indirect-ping message to mk1, mk2 +// 4. After mk1, mk2 received indirect-ping, then ping to M_J +// 5. M_J response with ACK message to both of mk1, mk2 +// 6. mk1, mk2 send back ACK message to M_I +// 7. M_I successfully finish indirect-probing +// +func TestSWIM_probe_When_Target_Respond_To_Indirect_Ping(t *testing.T) { + mJAddress := "127.0.0.1:11161" + mIAddress := "127.0.0.1:11162" + mK1Address := "127.0.0.1:11163" + mK2Address := "127.0.0.1:11164" + + // + // setup M_J message endpoint, SWIM + // + mJMessageHandler := &MockMessageHandler{} + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) + + // only ignore first message + i := 0 + mJMessageHandler.handleFunc = func(msg pb.Message) { + // First message handling should ignore, for starting + // indirect-probing + if i == 0 { + i++ + return + } + + // handle messages from mk1, mk2, response with ACK message + t.Logf("handling message from [%s]", msg.Address) + ack := pb.Message{Id: msg.Id, Payload: &pb.Message_Ack{Ack: &pb.Ack{}}, PiggyBack: &pb.PiggyBack{}} + mJMessageEndpoint.Send(msg.Address, ack) + } + + // + // setup M_I message endpoint, SWIM + // + config := &Config{ + BindAddress: "127.0.0.1", + BindPort: 11162, + K: 2, + T: 5000, + } + + pbkStore := &MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return pb.MbrStatsMsg{}, nil + } + + mI := &Member{ + ID: MemberID{ID: "mI"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11162, + } + + m1Member := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11163, Status: Alive} + m2Member := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11164, Status: Alive} + + mm := NewMemberMap(&SuspicionConfig{}) + mm.members[m1Member.ID] = m1Member + mm.members[m2Member.ID] = m2Member + + swim := &SWIM{} + swim.member = mI + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ + BindAddress: "127.0.0.1", + BindPort: 11162, + } + p, _ := NewPacketTransport(&tc) + + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 3, + CallbackCollectInterval: time.Hour, + } + mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = mIMessageEndpoint + + go mIMessageEndpoint.Listen() + defer mIMessageEndpoint.Shutdown() + + // + // setup M_K_1 message endpoint + // + mk1MessageHandler := &MockMessageHandler{} + mk1MessageEndpoint := createMessageEndpoint(t, mk1MessageHandler, time.Second, 11163) + + mk1MessageHandler.handleFunc = func(msg pb.Message) { + // test whether received message type is indirect-ping + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:11161"}) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") + assert.Equal(t, msg.Address, mIAddress) + + ping := createPingMessage(mK1Address, &pb.MbrStatsMsg{}) + // Send ping message to mj + mk1MessageEndpoint.SyncSend(mJAddress, ping) + ack := createAckMessage(msg.Id, &pb.MbrStatsMsg{}) + // send back to mi + mk1MessageEndpoint.Send(mIAddress, ack) return - case <-indFailed: - panic("This shouldn't be called") } + + // + // setup M_K_2 message endpoint, + // + mk2MessageHandler := &MockMessageHandler{} + mk2MessageEndpoint := createMessageEndpoint(t, mk2MessageHandler, time.Second, 11164) + + mk2MessageHandler.handleFunc = func(msg pb.Message) { + // test whether received message type is indirect-ping + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:11161"}) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") + assert.Equal(t, msg.Address, mIAddress) + + ping := createPingMessage(mK2Address, &pb.MbrStatsMsg{}) + // Send ping message to mj + mk2MessageEndpoint.SyncSend(mJAddress, ping) + ack := createAckMessage(msg.Id, &pb.MbrStatsMsg{}) + // send back to mi + mk2MessageEndpoint.Send(mIAddress, ack) + return + } + + go mJMessageEndpoint.Listen() + go mIMessageEndpoint.Listen() + go mk1MessageEndpoint.Listen() + go mk2MessageEndpoint.Listen() + defer func() { + mJMessageEndpoint.Shutdown() + mIMessageEndpoint.Shutdown() + mk1MessageEndpoint.Shutdown() + mk2MessageEndpoint.Shutdown() + }() + + mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} + swim.probe(mJMember) +} + +// This test case is for testing if we failed at ping to M_J, then indirect-probing is execute +// but M_J not response to indirect-ping message so probe finally failed +// +// Test case scenario: +// +// 1. M_I send ping message to M_J +// 2. M_J ignore ping message +// 3. M_I start indirect-probe, so send indirect-ping message to mk1, mk2 +// 4. After mk1, mk2 received indirect-ping, then ping to M_J +// 5. M_J NOT response to both of mk1, mk2 +// 6. mk1, mk2 send back NACK message to M_I +// 7. M_I finally failed indirect-probing, then suspect M_J +// +func TestSWIM_probe_When_Target_Not_Respond_To_Indirect_Ping(t *testing.T) { + mJAddress := "127.0.0.1:11161" + mIAddress := "127.0.0.1:11162" + mK1Address := "127.0.0.1:11163" + mK2Address := "127.0.0.1:11164" + + // + // setup M_J message endpoint, SWIM + // + mJMessageHandler := &MockMessageHandler{} + mJMessageEndpoint := createMessageEndpoint(t, mJMessageHandler, time.Second, 11161) + + // M_J not response to both of ping message and indirect-ping message + mJMessageHandler.handleFunc = func(msg pb.Message) { + return + } + + // + // setup M_I message endpoint, SWIM + // + config := &Config{ + BindAddress: "127.0.0.1", + BindPort: 11162, + K: 2, + T: 5000, + } + + pbkStore := &MockMbrStatsMsgStore{} + pbkStore.GetFunc = func() (pb.MbrStatsMsg, error) { + return pb.MbrStatsMsg{}, nil + } + + mI := &Member{ + ID: MemberID{ID: "mI"}, + Addr: net.ParseIP("127.0.0.1"), + Port: 11162, + } + + m1Member := &Member{ID: MemberID{ID: "m1"}, Addr: net.ParseIP("127.0.0.1"), Port: 11163, Status: Alive} + m2Member := &Member{ID: MemberID{ID: "m2"}, Addr: net.ParseIP("127.0.0.1"), Port: 11164, Status: Alive} + + mm := NewMemberMap(&SuspicionConfig{}) + mm.members[m1Member.ID] = m1Member + mm.members[m2Member.ID] = m2Member + + swim := &SWIM{} + swim.member = mI + swim.config = config + swim.mbrStatsMsgStore = pbkStore + swim.memberMap = mm + + // setup SWIM's message endpoint + tc := PacketTransportConfig{ + BindAddress: "127.0.0.1", + BindPort: 11162, + } + p, _ := NewPacketTransport(&tc) + + meConfig := MessageEndpointConfig{ + EncryptionEnabled: false, + SendTimeout: time.Second * 3, + CallbackCollectInterval: time.Hour, + } + mIMessageEndpoint, _ := NewMessageEndpoint(meConfig, p, swim) + + swim.messageEndpoint = mIMessageEndpoint + + go mIMessageEndpoint.Listen() + defer mIMessageEndpoint.Shutdown() + + // + // setup M_K_1 message endpoint + // + mk1MessageHandler := &MockMessageHandler{} + mk1MessageEndpoint := createMessageEndpoint(t, mk1MessageHandler, time.Second, 11163) + + mk1MessageHandler.handleFunc = func(msg pb.Message) { + // test whether received message type is indirect-ping + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:11161"}) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") + assert.Equal(t, msg.Address, mIAddress) + + ping := createPingMessage(mK1Address, &pb.MbrStatsMsg{}) + // Send ping message to mj + _, err := mk1MessageEndpoint.SyncSend(mJAddress, ping) + + assert.Error(t, err, ErrSendTimeout) + + // mk1 response with nack message + nack := createNackMessage(msg.Id, &pb.MbrStatsMsg{}) + // send back to mi + mk1MessageEndpoint.Send(mIAddress, nack) + return + } + + // + // setup M_K_2 message endpoint, + // + mk2MessageHandler := &MockMessageHandler{} + mk2MessageEndpoint := createMessageEndpoint(t, mk2MessageHandler, time.Second, 11164) + + mk2MessageHandler.handleFunc = func(msg pb.Message) { + // test whether received message type is indirect-ping + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing, &pb.IndirectPing{Target: "127.0.0.1:11161"}) + assert.Equal(t, msg.Payload.(*pb.Message_IndirectPing).IndirectPing.Target, "127.0.0.1:11161") + assert.Equal(t, msg.Address, mIAddress) + + ping := createPingMessage(mK2Address, &pb.MbrStatsMsg{}) + // Send ping message to mj + _, err := mk2MessageEndpoint.SyncSend(mJAddress, ping) + + assert.Error(t, err, ErrSendTimeout) + + nack := createNackMessage(msg.Id, &pb.MbrStatsMsg{}) + // send back to mi + mk2MessageEndpoint.Send(mIAddress, nack) + return + } + + go mJMessageEndpoint.Listen() + go mIMessageEndpoint.Listen() + go mk1MessageEndpoint.Listen() + go mk2MessageEndpoint.Listen() + defer func() { + mJMessageEndpoint.Shutdown() + mIMessageEndpoint.Shutdown() + mk1MessageEndpoint.Shutdown() + mk2MessageEndpoint.Shutdown() + }() + + mJMember := Member{ID: MemberID{ID: "mj"}, Addr: net.ParseIP("127.0.0.1"), Port: 11161, Status: Alive} + + swim.probe(mJMember) } func createMessageEndpoint(t *testing.T, messageHandler MessageHandler, sendTimeout time.Duration, port int) MessageEndpoint {