diff --git a/.gitignore b/.gitignore index 350a5dfb4..40a5425a9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,4 @@ tools/include .vscode .idea -*.log +.log diff --git a/pkg/common/event/dml_event.go b/pkg/common/event/dml_event.go index 27d0b1584..cc195320a 100644 --- a/pkg/common/event/dml_event.go +++ b/pkg/common/event/dml_event.go @@ -292,6 +292,10 @@ func (t *DMLEvent) decodeV0(data []byte) error { // AssembleRows assembles the Rows from the RawRows. // It also sets the TableInfo and clears the RawRows. func (t *DMLEvent) AssembleRows(tableInfo *common.TableInfo) error { + // t.Rows is already set, no need to assemble again + if t.Rows != nil { + return nil + } if tableInfo == nil { log.Panic("DMLEvent: TableInfo is nil") return nil diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index dcd20bbf6..5ed41572f 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -272,8 +272,8 @@ func (c *eventBroker) wakeDispatcher(dispatcherID common.DispatcherID) { // checkNeedScan checks if the dispatcher needs to scan the event store. // If the dispatcher needs to scan the event store, it returns true. // If the dispatcher does not need to scan the event store, it send the watermark to the dispatcher -func (c *eventBroker) checkNeedScan(ctx context.Context, task scanTask) (bool, common.DataRange) { - c.checkAndInitDispatcher(ctx, task) +func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) { + c.checkAndInitDispatcher(task) dataRange, needScan := task.dispatcherStat.getDataRange() if !needScan { @@ -300,7 +300,7 @@ func (c *eventBroker) checkNeedScan(ctx context.Context, task scanTask) (bool, c return true, dataRange } -func (c *eventBroker) checkAndInitDispatcher(ctx context.Context, task scanTask) { +func (c *eventBroker) checkAndInitDispatcher(task scanTask) { if task.dispatcherStat.isInitialized.Load() { return } @@ -350,7 +350,7 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { return } - needScan, dataRange := c.checkNeedScan(ctx, task) + needScan, dataRange := c.checkNeedScan(task) if !needScan { return } diff --git a/pkg/eventservice/event_broker_test.go b/pkg/eventservice/event_broker_test.go index 2043b8c37..b8cbfba06 100644 --- a/pkg/eventservice/event_broker_test.go +++ b/pkg/eventservice/event_broker_test.go @@ -3,7 +3,6 @@ package eventservice import ( "context" "fmt" - "math/rand" "sync" "testing" "time" @@ -37,7 +36,6 @@ func TestNewDispatcherStat(t *testing.T) { func TestDispatcherStatUpdateWatermark(t *testing.T) { startTs := uint64(123) - wg := &sync.WaitGroup{} info := &mockDispatcherInfo{ id: common.NewDispatcherID(), clusterID: 1, @@ -47,50 +45,26 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) { spanSubscription := newSpanSubscription(info.GetTableSpan(), startTs) stat := newDispatcherStat(startTs, info, spanSubscription, nil) - sendNewEvent := func(maxTs uint64) { - g := &sync.WaitGroup{} - for i := 0; i < 64; i++ { - ts := rand.Uint64() % maxTs - if i == 10 { - ts = maxTs - } - g.Add(1) - go func() { - defer g.Done() - stat.spanSubscription.onSubscriptionWatermark(ts) - }() - } - g.Wait() - } - // Case 1: no new events, only watermark change stat.spanSubscription.onSubscriptionWatermark(456) require.Equal(t, uint64(456), stat.spanSubscription.watermark.Load()) log.Info("pass TestDispatcherStatUpdateWatermark case 1") // Case 2: new events, and watermark increase - sendNewEvent(startTs) stat.spanSubscription.onSubscriptionWatermark(789) + stat.spanSubscription.onNewCommitTs(360) + require.Equal(t, uint64(360), stat.spanSubscription.maxEventCommitTs.Load()) require.Equal(t, uint64(789), stat.spanSubscription.watermark.Load()) - require.Equal(t, startTs, stat.spanSubscription.maxEventCommitTs.Load()) log.Info("pass TestDispatcherStatUpdateWatermark case 2") // Case 3: new events, and watermark decrease - // watermark should not decrease and no notification - sendNewEvent(360) - done := make(chan struct{}) - wg.Add(1) - go func() { - defer wg.Done() - stat.spanSubscription.onSubscriptionWatermark(456) - close(done) - }() - <-done + // watermark should not decrease + stat.spanSubscription.onSubscriptionWatermark(456) + stat.spanSubscription.onNewCommitTs(800) require.Equal(t, uint64(789), stat.spanSubscription.watermark.Load()) - require.Equal(t, uint64(360), stat.spanSubscription.maxEventCommitTs.Load()) + require.Equal(t, uint64(800), stat.spanSubscription.maxEventCommitTs.Load()) log.Info("pass TestDispatcherStatUpdateWatermark case 3") - wg.Wait() } func newTableSpan(tableID int64, start, end string) *heartbeatpb.TableSpan { @@ -167,7 +141,7 @@ func TestSendEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - eventStore := newMockEventStore() + eventStore := newMockEventStore(100) schemaStore := newMockSchemaStore() msgCh := make(chan *messaging.TargetMessage, 1024) mc := &mockMessageCenter{messageCh: msgCh} @@ -220,8 +194,9 @@ func TestSendEvents(t *testing.T) { schemaStore.AppendDDLEvent(tableID, ddlEvent, ddlEvent1, ddlEvent2) - span, ok := eventStore.spans[tableID] + v, ok := eventStore.spansMap.Load(tableID) require.True(t, ok) + span := v.(*mockSpanStats) span.update(ddlEvent2.FinishedTs+1, append(kvEvents, kvEvents2...)...) wg.Wait() diff --git a/pkg/eventservice/event_service_performance_test.go b/pkg/eventservice/event_service_performance_test.go index bb7cea21c..a3b710603 100644 --- a/pkg/eventservice/event_service_performance_test.go +++ b/pkg/eventservice/event_service_performance_test.go @@ -25,7 +25,7 @@ func TestEventServiceOneMillionTable(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg := &sync.WaitGroup{} - mockStore := newMockEventStore() + mockStore := newMockEventStore(100) tableNum := 100_0000 sendRound := 10 mc := &mockMessageCenter{ @@ -73,11 +73,12 @@ func TestEventServiceOneMillionTable(t *testing.T) { <-ticker.C sendStart := time.Now() for _, dispatcher := range dispatchers { - sub, ok := mockStore.spans[dispatcher.GetTableSpan().TableID] + v, ok := mockStore.spansMap.Load(dispatcher.GetTableSpan().TableID) if !ok { continue } - sub.update(sub.watermark + 1) + spanStats := v.(*mockSpanStats) + spanStats.update(spanStats.watermark.Load()+1, nil) } log.Info("send resolvedTs events for 1 million tables", zap.Duration("cost", time.Since(sendStart)), zap.Any("round", round)) round++ diff --git a/pkg/eventservice/event_service_test.go b/pkg/eventservice/event_service_test.go index d582d26dd..f3e15a706 100644 --- a/pkg/eventservice/event_service_test.go +++ b/pkg/eventservice/event_service_test.go @@ -3,7 +3,10 @@ package eventservice import ( "context" "database/sql" + "math" "sort" + "sync" + "sync/atomic" "testing" "time" @@ -95,12 +98,14 @@ var _ eventstore.EventStore = &mockEventStore{} // mockEventStore is a mock implementation of the EventStore interface type mockEventStore struct { - spans map[common.TableID]*mockSpanStats + resolvedTsUpdateInterval time.Duration + spansMap sync.Map } -func newMockEventStore() *mockEventStore { +func newMockEventStore(resolvedTsUpdateInterval int) *mockEventStore { return &mockEventStore{ - spans: make(map[common.TableID]*mockSpanStats), + resolvedTsUpdateInterval: time.Millisecond * time.Duration(resolvedTsUpdateInterval), + spansMap: sync.Map{}, } } @@ -109,6 +114,22 @@ func (m *mockEventStore) Name() string { } func (m *mockEventStore) Run(ctx context.Context) error { + // Loop all spans and notify the watermarkNotifier. + ticker := time.NewTicker(time.Millisecond * 100) + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.spansMap.Range(func(key, value any) bool { + spanStats := value.(*mockSpanStats) + spanStats.watermarkNotifier(spanStats.watermark.Load() + 1) + return true + }) + } + } + }() return nil } @@ -128,13 +149,16 @@ func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange iter := &mockEventIterator{ events: make([]*common.RawKVEntry, 0), } - - for _, e := range m.spans[dataRange.Span.TableID].pendingEvents { + v, ok := m.spansMap.Load(dataRange.Span.TableID) + if !ok { + return nil, nil + } + spanStats := v.(*mockSpanStats) + for _, e := range spanStats.pendingEvents { if e.CRTs > dataRange.StartTs && e.CRTs <= dataRange.EndTs { iter.events = append(iter.events, e) } } - return iter, nil } @@ -146,13 +170,14 @@ func (m *mockEventStore) RegisterDispatcher( notifier eventstore.WatermarkNotifier, ) error { log.Info("subscribe table span", zap.Any("span", span), zap.Uint64("startTs", uint64(startTS))) - m.spans[span.TableID] = &mockSpanStats{ + spanStats := &mockSpanStats{ startTs: uint64(startTS), - watermark: uint64(startTS), watermarkNotifier: notifier, eventObserver: observer, pendingEvents: make([]*common.RawKVEntry, 0), } + spanStats.watermark.Store(uint64(startTS)) + m.spansMap.Store(span.TableID, spanStats) return nil } @@ -197,7 +222,7 @@ func newMockSchemaStore() *mockSchemaStore { return &mockSchemaStore{ DDLEvents: make(map[common.TableID][]commonEvent.DDLEvent), TableInfo: make(map[common.TableID][]*common.TableInfo), - resolvedTs: 0, + resolvedTs: math.MaxUint64, } } @@ -277,7 +302,7 @@ func (m *mockSchemaStore) FetchTableTriggerDDLEvents(tableFilter filter.Filter, type mockSpanStats struct { startTs uint64 - watermark uint64 + watermark atomic.Uint64 pendingEvents []*common.RawKVEntry eventObserver func(watermark uint64) watermarkNotifier func(watermark uint64) @@ -285,7 +310,7 @@ type mockSpanStats struct { func (m *mockSpanStats) update(watermark uint64, events ...*common.RawKVEntry) { m.pendingEvents = append(m.pendingEvents, events...) - m.watermark = watermark + m.watermark.Store(watermark) for _, e := range events { m.eventObserver(e.CRTs) } @@ -373,8 +398,8 @@ func genEvents(helper *pevent.EventTestHelper, t *testing.T, ddl string, dmls .. job := helper.DDL2Job(ddl) schema := job.SchemaName table := job.TableName - kvEvents1 := helper.DML2RawKv(schema, table, dmls...) - for _, e := range kvEvents1 { + kvEvents := helper.DML2RawKv(schema, table, dmls...) + for _, e := range kvEvents { require.Equal(t, job.BinlogInfo.TableInfo.UpdateTS-1, e.StartTs) require.Equal(t, job.BinlogInfo.TableInfo.UpdateTS+1, e.CRTs) } @@ -386,7 +411,7 @@ func genEvents(helper *pevent.EventTestHelper, t *testing.T, ddl string, dmls .. TableName: job.TableName, Query: ddl, TableInfo: common.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.TableInfo), - }, kvEvents1 + }, kvEvents } // This test is to test the mockEventIterator works as expected. @@ -441,7 +466,9 @@ func TestEventServiceBasic(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - mockStore := newMockEventStore() + mockStore := newMockEventStore(100) + mockStore.Run(ctx) + mc := &mockMessageCenter{ messageCh: make(chan *messaging.TargetMessage, 100), } @@ -457,7 +484,7 @@ func TestEventServiceBasic(t *testing.T) { require.Equal(t, 1, len(esImpl.brokers)) require.NotNil(t, esImpl.brokers[dispatcherInfo.GetClusterID()]) - // add events to logpuller + // add events to eventStore helper := pevent.NewEventTestHelper(t) defer helper.Close() ddlEvent, kvEvents := genEvents(helper, t, `create table test.t(id int primary key, c char(50))`, []string{ @@ -466,10 +493,11 @@ func TestEventServiceBasic(t *testing.T) { `insert into test.t(id,c) values (2, "c2")`, }...) require.NotNil(t, kvEvents) - - sourceSpanStat, ok := mockStore.spans[dispatcherInfo.span.TableID] + v, ok := mockStore.spansMap.Load(dispatcherInfo.span.TableID) require.True(t, ok) + sourceSpanStat := v.(*mockSpanStats) + // add events to eventStore sourceSpanStat.update(kvEvents[0].CRTs, kvEvents...) schemastore := esImpl.schemaStore.(*mockSchemaStore) schemastore.AppendDDLEvent(dispatcherInfo.span.TableID, ddlEvent) @@ -481,22 +509,34 @@ func TestEventServiceBasic(t *testing.T) { for _, m := range msg.Message { msgCnt++ switch e := m.(type) { + case *commonEvent.HandshakeEvent: + require.NotNil(t, msg) + require.Equal(t, "event-collector", msg.Topic) + require.Equal(t, dispatcherInfo.id, e.DispatcherID) + require.Equal(t, dispatcherInfo.startTs, e.GetStartTs()) + require.Equal(t, uint64(1), e.Seq) + log.Info("receive handshake event", zap.Any("event", e)) case *commonEvent.DMLEvent: require.NotNil(t, msg) require.Equal(t, "event-collector", msg.Topic) require.Equal(t, len(kvEvents), e.Len()) require.Equal(t, kvEvents[0].CRTs, e.CommitTs) + require.Equal(t, uint64(2), e.Seq) + log.Info("receive dml event", zap.Any("event", e)) case *commonEvent.DDLEvent: require.NotNil(t, msg) require.Equal(t, "event-collector", msg.Topic) require.Equal(t, ddlEvent.FinishedTs, e.FinishedTs) + require.Equal(t, uint64(3), e.Seq) + log.Info("receive ddl event", zap.Any("event", e)) case *commonEvent.BatchResolvedEvent: require.NotNil(t, msg) - log.Info("received watermark", zap.Uint64("ts", e.Events[0].ResolvedTs)) + log.Info("receive watermark", zap.Uint64("ts", e.Events[0].ResolvedTs)) } } - if msgCnt == 3 { + if msgCnt == 4 { break } } + } diff --git a/pkg/eventservice/handler.go b/pkg/eventservice/helper.go similarity index 90% rename from pkg/eventservice/handler.go rename to pkg/eventservice/helper.go index 011b29be7..bbaf739c7 100644 --- a/pkg/eventservice/handler.go +++ b/pkg/eventservice/helper.go @@ -1,8 +1,6 @@ package eventservice import ( - "context" - "github.com/flowbehappy/tigate/pkg/common" "github.com/pingcap/log" ) @@ -23,8 +21,7 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask) log.Panic("only one task is allowed") } task := tasks[0] - ctx := context.Background() - needScan, _ := broker.checkNeedScan(ctx, task) + needScan, _ := broker.checkNeedScan(task) if !needScan { task.handle() return false diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index 13a4a6032..60a446e94 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -361,5 +361,4 @@ func (s *grpcServer) handleConnect(msg *proto.Message, stream grpcSender, isEven } else { return remoteTarget.runCommandSendStream(stream) } - } diff --git a/pkg/messaging/message_center_integration_test.go b/pkg/messaging/message_center_integration_test.go index b6d748302..7b7293017 100644 --- a/pkg/messaging/message_center_integration_test.go +++ b/pkg/messaging/message_center_integration_test.go @@ -21,9 +21,9 @@ import ( "google.golang.org/grpc" ) -var epoch = uint64(1) +var mockEpoch = uint64(1) -func newMessageCenterForTest(t *testing.T) (*messageCenter, string, func()) { +func NewMessageCenterForTest(t *testing.T) (*messageCenter, string, func()) { port := freeport.GetPort() addr := fmt.Sprintf("127.0.0.1:%d", port) lis, err := net.Listen("tcp", addr) @@ -35,8 +35,8 @@ func newMessageCenterForTest(t *testing.T) (*messageCenter, string, func()) { ctx, cancel := context.WithCancel(context.Background()) mcConfig := config.NewDefaultMessageCenterConfig() id := node.NewID() - mc := NewMessageCenter(ctx, id, epoch, mcConfig) - epoch++ + mc := NewMessageCenter(ctx, id, mockEpoch, mcConfig) + mockEpoch++ mcs := NewMessageCenterServer(mc) proto.RegisterMessageCenterServer(grpcServer, mcs) @@ -55,66 +55,43 @@ func newMessageCenterForTest(t *testing.T) (*messageCenter, string, func()) { return mc, string(addr), stop } -func TestMessageCenterBasic(t *testing.T) { - mc1, mc1Addr, mc1Stop := newMessageCenterForTest(t) - mc2, mc2Addr, mc2Stop := newMessageCenterForTest(t) - mc3, mc3Addr, mc3Stop := newMessageCenterForTest(t) - defer mc1Stop() - defer mc2Stop() - defer mc3Stop() - helper := event.NewEventTestHelper(t) - defer helper.Close() - - helper.Tk().MustExec("use test") - _ = helper.DDL2Job("create table t1(id int primary key, a int, b int, c int)") - dml1 := helper.DML2Event("test", "t1", "insert into t1 values (1, 1, 1, 1)") - require.NotNil(t, dml1) - dml2 := helper.DML2Event("test", "t1", "insert into t1 values (2, 2, 2, 2)") - require.NotNil(t, dml2) - dml3 := helper.DML2Event("test", "t1", "insert into t1 values (3, 3, 3, 3)") - require.NotNil(t, dml3) - - topic1 := dml1.DispatcherID.String() - topic2 := dml2.DispatcherID.String() - topic3 := dml3.DispatcherID.String() +func setupMessageCenters(t *testing.T) (*messageCenter, *messageCenter, *messageCenter, func()) { + mc1, mc1Addr, mc1Stop := NewMessageCenterForTest(t) + mc2, mc2Addr, mc2Stop := NewMessageCenterForTest(t) + mc3, mc3Addr, mc3Stop := NewMessageCenterForTest(t) mc1.addTarget(mc2.id, mc2.epoch, mc2Addr) mc1.addTarget(mc3.id, mc3.epoch, mc3Addr) - ch1 := make(chan *TargetMessage, 1) - h1 := func(ctx context.Context, msg *TargetMessage) error { - ch1 <- msg - log.Info("mc1 received message", zap.Any("msg", msg)) - return nil - } - mc1.RegisterHandler(topic1, h1) - mc2.addTarget(mc1.id, mc1.epoch, mc1Addr) mc2.addTarget(mc3.id, mc3.epoch, mc3Addr) - ch2 := make(chan *TargetMessage, 1) - h2 := func(ctx context.Context, msg *TargetMessage) error { - ch2 <- msg - log.Info("mc2 received message", zap.Any("msg", msg)) - return nil - } - mc2.RegisterHandler(topic2, h2) - mc3.addTarget(mc1.id, mc1.epoch, mc1Addr) mc3.addTarget(mc2.id, mc2.epoch, mc2Addr) - ch3 := make(chan *TargetMessage, 1) - h3 := func(ctx context.Context, msg *TargetMessage) error { - ch3 <- msg - log.Info("mc3 received message", zap.Any("msg", msg)) - return nil + cleanup := func() { + mc1Stop() + mc2Stop() + mc3Stop() } - mc3.RegisterHandler(topic3, h3) + return mc1, mc2, mc3, cleanup +} + +func registerHandler(mc *messageCenter, topic string) chan *TargetMessage { + ch := make(chan *TargetMessage, 1) + mc.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) error { + ch <- msg + log.Info(fmt.Sprintf("%s received message", mc.id), zap.Any("msg", msg)) + return nil + }) + return ch +} + +func waitForTargetsReady(mc *messageCenter) { // wait for all targets to be ready time.Sleep(time.Second) -LOOP: for { allReady := true - for _, target := range mc1.remoteTargets.m { + for _, target := range mc.remoteTargets.m { if !target.isReadyToSend() { log.Info("target is not ready, retry it later", zap.String("target", target.targetId.String())) allReady = false @@ -123,88 +100,81 @@ LOOP: } if allReady { log.Info("All targets are ready") - break LOOP + return } time.Sleep(time.Millisecond * 100) } +} + +func sendAndReceiveMessage(t *testing.T, sender *messageCenter, receiver *messageCenter, topic string, event *commonEvent.DMLEvent) { + targetMsg := NewSingleTargetMessage(receiver.id, topic, event) + ch := make(chan *TargetMessage, 1) + receiver.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) error { + ch <- msg + return nil + }) - //Case1: Send a message from mc1 to mc1, local message. - targetMsg := NewSingleTargetMessage(mc1.id, topic1, dml1) - var receivedMsg *TargetMessage timeoutCh := time.After(30 * time.Second) -LOOP2: for { - err := mc1.SendEvent(targetMsg) + err := sender.SendEvent(targetMsg) require.NoError(t, err) select { - case receivedMsg = <-ch1: - break LOOP2 + case receivedMsg := <-ch: + validateReceivedMessage(t, targetMsg, receivedMsg, sender.id, event) + return case <-timeoutCh: - t.Fatal("Timeout when sending message to local target") + t.Fatal("Timeout when sending message") default: log.Info("waiting for message, retry to send it later") time.Sleep(time.Millisecond * 100) - continue } } +} +func validateReceivedMessage(t *testing.T, targetMsg *TargetMessage, receivedMsg *TargetMessage, senderID node.ID, event *commonEvent.DMLEvent) { require.Equal(t, targetMsg.To, receivedMsg.To) - require.Equal(t, mc1.id, receivedMsg.From) + require.Equal(t, senderID, receivedMsg.From) require.Equal(t, targetMsg.Type, receivedMsg.Type) - require.Equal(t, targetMsg.Message, receivedMsg.Message) - log.Info("Pass test 1: send and receive local message", zap.Any("receivedMsg", receivedMsg)) + receivedEvent := receivedMsg.Message[0].(*commonEvent.DMLEvent) + receivedEvent.AssembleRows(event.TableInfo) + require.Equal(t, event.Rows.ToString(event.TableInfo.GetFieldSlice()), receivedEvent.Rows.ToString(event.TableInfo.GetFieldSlice())) +} - //Case2: Send a message from mc1 to mc2, remote message. - targetMsg = NewSingleTargetMessage(mc2.id, topic2, dml2) - timeoutCh = time.After(30 * time.Second) -LOOP3: - for { - err := mc1.SendEvent(targetMsg) - require.NoError(t, err) - select { - case receivedMsg = <-ch2: - break LOOP3 - case <-timeoutCh: - t.Fatal("Timeout when sending message to remote target") - default: - log.Info("waiting for message, retry to send it later") +func TestMessageCenterBasic(t *testing.T) { + mc1, mc2, mc3, cleanup := setupMessageCenters(t) + defer cleanup() - time.Sleep(time.Millisecond * 100) - continue - } - } - require.Equal(t, targetMsg.To, receivedMsg.To) - require.Equal(t, mc1.id, receivedMsg.From) - require.Equal(t, targetMsg.Type, receivedMsg.Type) - receivedEvent := receivedMsg.Message[0].(*commonEvent.DMLEvent) - receivedEvent.AssembleRows(dml2.TableInfo) - require.Equal(t, dml2.Rows.ToString(dml2.TableInfo.GetFieldSlice()), receivedEvent.Rows.ToString(dml2.TableInfo.GetFieldSlice())) - log.Info("Pass test 2: send and receive remote message", zap.Any("receivedMsg", receivedMsg)) - - //Case3: Send a message from mc2 to mc3, remote message. - targetMsg = NewSingleTargetMessage(mc3.id, topic3, dml3) - timeoutCh = time.After(30 * time.Second) -LOOP4: - for { - err := mc2.SendEvent(targetMsg) - require.NoError(t, err) - select { - case receivedMsg = <-ch3: - break LOOP4 - case <-timeoutCh: - t.Fatal("Timeout when sending message to remote target") - default: - log.Info("waiting for message, retry to send it later") + helper := event.NewEventTestHelper(t) + defer helper.Close() - time.Sleep(time.Millisecond * 100) - continue - } - } - require.Equal(t, targetMsg.To, receivedMsg.To) - require.Equal(t, mc2.id, receivedMsg.From) - require.Equal(t, targetMsg.Type, receivedMsg.Type) - receivedEvent = receivedMsg.Message[0].(*commonEvent.DMLEvent) - receivedEvent.AssembleRows(dml3.TableInfo) - require.Equal(t, dml3.Rows.ToString(dml3.TableInfo.GetFieldSlice()), receivedEvent.Rows.ToString(dml3.TableInfo.GetFieldSlice())) - log.Info("Pass test 3: send and receive remote message", zap.Any("receivedMsg", receivedMsg)) + helper.Tk().MustExec("use test") + _ = helper.DDL2Job("create table t1(id int primary key, a int, b int, c int)") + dml1 := helper.DML2Event("test", "t1", "insert into t1 values (1, 1, 1, 1)") + dml2 := helper.DML2Event("test", "t1", "insert into t1 values (2, 2, 2, 2)") + dml3 := helper.DML2Event("test", "t1", "insert into t1 values (3, 3, 3, 3)") + + topic1 := "topic1" + topic2 := "topic2" + topic3 := "topic3" + + registerHandler(mc1, topic1) + registerHandler(mc2, topic2) + registerHandler(mc3, topic3) + + time.Sleep(time.Second) + waitForTargetsReady(mc1) + waitForTargetsReady(mc2) + waitForTargetsReady(mc3) + + // Case 1: Send a message from mc1 to mc1 (local message) + sendAndReceiveMessage(t, mc1, mc1, topic1, dml1) + log.Info("Pass test 1: send and receive local message") + + // Case 2: Send a message from mc1 to mc2 (remote message) + sendAndReceiveMessage(t, mc1, mc2, topic2, dml2) + log.Info("Pass test 2: send and receive remote message") + + // Case 3: Send a message from mc2 to mc3 (remote message) + sendAndReceiveMessage(t, mc2, mc3, topic3, dml3) + log.Info("Pass test 3: send and receive remote message") }