Skip to content

Commit

Permalink
eventService: fix integration test (pingcap#408)
Browse files Browse the repository at this point in the history
* adjust some test code

Signed-off-by: dongmen <414110582@qq.com>

* eventService: fix integration test

Signed-off-by: dongmen <414110582@qq.com>

* eventService: fix integration test

Signed-off-by: dongmen <414110582@qq.com>

---------

Signed-off-by: dongmen <414110582@qq.com>
  • Loading branch information
asddongmen authored Oct 24, 2024
1 parent 2168025 commit 9538b85
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 181 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ tools/include

.vscode
.idea
*.log
.log
4 changes: 4 additions & 0 deletions pkg/common/event/dml_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ func (t *DMLEvent) decodeV0(data []byte) error {
// AssembleRows assembles the Rows from the RawRows.
// It also sets the TableInfo and clears the RawRows.
func (t *DMLEvent) AssembleRows(tableInfo *common.TableInfo) error {
// t.Rows is already set, no need to assemble again
if t.Rows != nil {
return nil
}
if tableInfo == nil {
log.Panic("DMLEvent: TableInfo is nil")
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ func (c *eventBroker) wakeDispatcher(dispatcherID common.DispatcherID) {
// checkNeedScan checks if the dispatcher needs to scan the event store.
// If the dispatcher needs to scan the event store, it returns true.
// If the dispatcher does not need to scan the event store, it send the watermark to the dispatcher
func (c *eventBroker) checkNeedScan(ctx context.Context, task scanTask) (bool, common.DataRange) {
c.checkAndInitDispatcher(ctx, task)
func (c *eventBroker) checkNeedScan(task scanTask) (bool, common.DataRange) {
c.checkAndInitDispatcher(task)

dataRange, needScan := task.dispatcherStat.getDataRange()
if !needScan {
Expand All @@ -300,7 +300,7 @@ func (c *eventBroker) checkNeedScan(ctx context.Context, task scanTask) (bool, c
return true, dataRange
}

func (c *eventBroker) checkAndInitDispatcher(ctx context.Context, task scanTask) {
func (c *eventBroker) checkAndInitDispatcher(task scanTask) {
if task.dispatcherStat.isInitialized.Load() {
return
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
return
}

needScan, dataRange := c.checkNeedScan(ctx, task)
needScan, dataRange := c.checkNeedScan(task)
if !needScan {
return
}
Expand Down
43 changes: 9 additions & 34 deletions pkg/eventservice/event_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package eventservice
import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -37,7 +36,6 @@ func TestNewDispatcherStat(t *testing.T) {

func TestDispatcherStatUpdateWatermark(t *testing.T) {
startTs := uint64(123)
wg := &sync.WaitGroup{}
info := &mockDispatcherInfo{
id: common.NewDispatcherID(),
clusterID: 1,
Expand All @@ -47,50 +45,26 @@ func TestDispatcherStatUpdateWatermark(t *testing.T) {
spanSubscription := newSpanSubscription(info.GetTableSpan(), startTs)
stat := newDispatcherStat(startTs, info, spanSubscription, nil)

sendNewEvent := func(maxTs uint64) {
g := &sync.WaitGroup{}
for i := 0; i < 64; i++ {
ts := rand.Uint64() % maxTs
if i == 10 {
ts = maxTs
}
g.Add(1)
go func() {
defer g.Done()
stat.spanSubscription.onSubscriptionWatermark(ts)
}()
}
g.Wait()
}

// Case 1: no new events, only watermark change
stat.spanSubscription.onSubscriptionWatermark(456)
require.Equal(t, uint64(456), stat.spanSubscription.watermark.Load())
log.Info("pass TestDispatcherStatUpdateWatermark case 1")

// Case 2: new events, and watermark increase
sendNewEvent(startTs)
stat.spanSubscription.onSubscriptionWatermark(789)
stat.spanSubscription.onNewCommitTs(360)
require.Equal(t, uint64(360), stat.spanSubscription.maxEventCommitTs.Load())
require.Equal(t, uint64(789), stat.spanSubscription.watermark.Load())
require.Equal(t, startTs, stat.spanSubscription.maxEventCommitTs.Load())
log.Info("pass TestDispatcherStatUpdateWatermark case 2")

// Case 3: new events, and watermark decrease
// watermark should not decrease and no notification
sendNewEvent(360)
done := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
stat.spanSubscription.onSubscriptionWatermark(456)
close(done)
}()
<-done
// watermark should not decrease
stat.spanSubscription.onSubscriptionWatermark(456)
stat.spanSubscription.onNewCommitTs(800)
require.Equal(t, uint64(789), stat.spanSubscription.watermark.Load())
require.Equal(t, uint64(360), stat.spanSubscription.maxEventCommitTs.Load())
require.Equal(t, uint64(800), stat.spanSubscription.maxEventCommitTs.Load())
log.Info("pass TestDispatcherStatUpdateWatermark case 3")

wg.Wait()
}

func newTableSpan(tableID int64, start, end string) *heartbeatpb.TableSpan {
Expand Down Expand Up @@ -167,7 +141,7 @@ func TestSendEvents(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventStore := newMockEventStore()
eventStore := newMockEventStore(100)
schemaStore := newMockSchemaStore()
msgCh := make(chan *messaging.TargetMessage, 1024)
mc := &mockMessageCenter{messageCh: msgCh}
Expand Down Expand Up @@ -220,8 +194,9 @@ func TestSendEvents(t *testing.T) {

schemaStore.AppendDDLEvent(tableID, ddlEvent, ddlEvent1, ddlEvent2)

span, ok := eventStore.spans[tableID]
v, ok := eventStore.spansMap.Load(tableID)
require.True(t, ok)
span := v.(*mockSpanStats)
span.update(ddlEvent2.FinishedTs+1, append(kvEvents, kvEvents2...)...)

wg.Wait()
Expand Down
7 changes: 4 additions & 3 deletions pkg/eventservice/event_service_performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestEventServiceOneMillionTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := &sync.WaitGroup{}
mockStore := newMockEventStore()
mockStore := newMockEventStore(100)
tableNum := 100_0000
sendRound := 10
mc := &mockMessageCenter{
Expand Down Expand Up @@ -73,11 +73,12 @@ func TestEventServiceOneMillionTable(t *testing.T) {
<-ticker.C
sendStart := time.Now()
for _, dispatcher := range dispatchers {
sub, ok := mockStore.spans[dispatcher.GetTableSpan().TableID]
v, ok := mockStore.spansMap.Load(dispatcher.GetTableSpan().TableID)
if !ok {
continue
}
sub.update(sub.watermark + 1)
spanStats := v.(*mockSpanStats)
spanStats.update(spanStats.watermark.Load()+1, nil)
}
log.Info("send resolvedTs events for 1 million tables", zap.Duration("cost", time.Since(sendStart)), zap.Any("round", round))
round++
Expand Down
80 changes: 60 additions & 20 deletions pkg/eventservice/event_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package eventservice
import (
"context"
"database/sql"
"math"
"sort"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -95,12 +98,14 @@ var _ eventstore.EventStore = &mockEventStore{}

// mockEventStore is a mock implementation of the EventStore interface
type mockEventStore struct {
spans map[common.TableID]*mockSpanStats
resolvedTsUpdateInterval time.Duration
spansMap sync.Map
}

func newMockEventStore() *mockEventStore {
func newMockEventStore(resolvedTsUpdateInterval int) *mockEventStore {
return &mockEventStore{
spans: make(map[common.TableID]*mockSpanStats),
resolvedTsUpdateInterval: time.Millisecond * time.Duration(resolvedTsUpdateInterval),
spansMap: sync.Map{},
}
}

Expand All @@ -109,6 +114,22 @@ func (m *mockEventStore) Name() string {
}

func (m *mockEventStore) Run(ctx context.Context) error {
// Loop all spans and notify the watermarkNotifier.
ticker := time.NewTicker(time.Millisecond * 100)
go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
m.spansMap.Range(func(key, value any) bool {
spanStats := value.(*mockSpanStats)
spanStats.watermarkNotifier(spanStats.watermark.Load() + 1)
return true
})
}
}
}()
return nil
}

Expand All @@ -128,13 +149,16 @@ func (m *mockEventStore) GetIterator(dispatcherID common.DispatcherID, dataRange
iter := &mockEventIterator{
events: make([]*common.RawKVEntry, 0),
}

for _, e := range m.spans[dataRange.Span.TableID].pendingEvents {
v, ok := m.spansMap.Load(dataRange.Span.TableID)
if !ok {
return nil, nil
}
spanStats := v.(*mockSpanStats)
for _, e := range spanStats.pendingEvents {
if e.CRTs > dataRange.StartTs && e.CRTs <= dataRange.EndTs {
iter.events = append(iter.events, e)
}
}

return iter, nil
}

Expand All @@ -146,13 +170,14 @@ func (m *mockEventStore) RegisterDispatcher(
notifier eventstore.WatermarkNotifier,
) error {
log.Info("subscribe table span", zap.Any("span", span), zap.Uint64("startTs", uint64(startTS)))
m.spans[span.TableID] = &mockSpanStats{
spanStats := &mockSpanStats{
startTs: uint64(startTS),
watermark: uint64(startTS),
watermarkNotifier: notifier,
eventObserver: observer,
pendingEvents: make([]*common.RawKVEntry, 0),
}
spanStats.watermark.Store(uint64(startTS))
m.spansMap.Store(span.TableID, spanStats)
return nil
}

Expand Down Expand Up @@ -197,7 +222,7 @@ func newMockSchemaStore() *mockSchemaStore {
return &mockSchemaStore{
DDLEvents: make(map[common.TableID][]commonEvent.DDLEvent),
TableInfo: make(map[common.TableID][]*common.TableInfo),
resolvedTs: 0,
resolvedTs: math.MaxUint64,
}
}

Expand Down Expand Up @@ -277,15 +302,15 @@ func (m *mockSchemaStore) FetchTableTriggerDDLEvents(tableFilter filter.Filter,

type mockSpanStats struct {
startTs uint64
watermark uint64
watermark atomic.Uint64
pendingEvents []*common.RawKVEntry
eventObserver func(watermark uint64)
watermarkNotifier func(watermark uint64)
}

func (m *mockSpanStats) update(watermark uint64, events ...*common.RawKVEntry) {
m.pendingEvents = append(m.pendingEvents, events...)
m.watermark = watermark
m.watermark.Store(watermark)
for _, e := range events {
m.eventObserver(e.CRTs)
}
Expand Down Expand Up @@ -373,8 +398,8 @@ func genEvents(helper *pevent.EventTestHelper, t *testing.T, ddl string, dmls ..
job := helper.DDL2Job(ddl)
schema := job.SchemaName
table := job.TableName
kvEvents1 := helper.DML2RawKv(schema, table, dmls...)
for _, e := range kvEvents1 {
kvEvents := helper.DML2RawKv(schema, table, dmls...)
for _, e := range kvEvents {
require.Equal(t, job.BinlogInfo.TableInfo.UpdateTS-1, e.StartTs)
require.Equal(t, job.BinlogInfo.TableInfo.UpdateTS+1, e.CRTs)
}
Expand All @@ -386,7 +411,7 @@ func genEvents(helper *pevent.EventTestHelper, t *testing.T, ddl string, dmls ..
TableName: job.TableName,
Query: ddl,
TableInfo: common.WrapTableInfo(job.SchemaID, job.SchemaName, job.BinlogInfo.TableInfo),
}, kvEvents1
}, kvEvents
}

// This test is to test the mockEventIterator works as expected.
Expand Down Expand Up @@ -441,7 +466,9 @@ func TestEventServiceBasic(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

mockStore := newMockEventStore()
mockStore := newMockEventStore(100)
mockStore.Run(ctx)

mc := &mockMessageCenter{
messageCh: make(chan *messaging.TargetMessage, 100),
}
Expand All @@ -457,7 +484,7 @@ func TestEventServiceBasic(t *testing.T) {
require.Equal(t, 1, len(esImpl.brokers))
require.NotNil(t, esImpl.brokers[dispatcherInfo.GetClusterID()])

// add events to logpuller
// add events to eventStore
helper := pevent.NewEventTestHelper(t)
defer helper.Close()
ddlEvent, kvEvents := genEvents(helper, t, `create table test.t(id int primary key, c char(50))`, []string{
Expand All @@ -466,10 +493,11 @@ func TestEventServiceBasic(t *testing.T) {
`insert into test.t(id,c) values (2, "c2")`,
}...)
require.NotNil(t, kvEvents)

sourceSpanStat, ok := mockStore.spans[dispatcherInfo.span.TableID]
v, ok := mockStore.spansMap.Load(dispatcherInfo.span.TableID)
require.True(t, ok)

sourceSpanStat := v.(*mockSpanStats)
// add events to eventStore
sourceSpanStat.update(kvEvents[0].CRTs, kvEvents...)
schemastore := esImpl.schemaStore.(*mockSchemaStore)
schemastore.AppendDDLEvent(dispatcherInfo.span.TableID, ddlEvent)
Expand All @@ -481,22 +509,34 @@ func TestEventServiceBasic(t *testing.T) {
for _, m := range msg.Message {
msgCnt++
switch e := m.(type) {
case *commonEvent.HandshakeEvent:
require.NotNil(t, msg)
require.Equal(t, "event-collector", msg.Topic)
require.Equal(t, dispatcherInfo.id, e.DispatcherID)
require.Equal(t, dispatcherInfo.startTs, e.GetStartTs())
require.Equal(t, uint64(1), e.Seq)
log.Info("receive handshake event", zap.Any("event", e))
case *commonEvent.DMLEvent:
require.NotNil(t, msg)
require.Equal(t, "event-collector", msg.Topic)
require.Equal(t, len(kvEvents), e.Len())
require.Equal(t, kvEvents[0].CRTs, e.CommitTs)
require.Equal(t, uint64(2), e.Seq)
log.Info("receive dml event", zap.Any("event", e))
case *commonEvent.DDLEvent:
require.NotNil(t, msg)
require.Equal(t, "event-collector", msg.Topic)
require.Equal(t, ddlEvent.FinishedTs, e.FinishedTs)
require.Equal(t, uint64(3), e.Seq)
log.Info("receive ddl event", zap.Any("event", e))
case *commonEvent.BatchResolvedEvent:
require.NotNil(t, msg)
log.Info("received watermark", zap.Uint64("ts", e.Events[0].ResolvedTs))
log.Info("receive watermark", zap.Uint64("ts", e.Events[0].ResolvedTs))
}
}
if msgCnt == 3 {
if msgCnt == 4 {
break
}
}

}
5 changes: 1 addition & 4 deletions pkg/eventservice/handler.go → pkg/eventservice/helper.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package eventservice

import (
"context"

"github.com/flowbehappy/tigate/pkg/common"
"github.com/pingcap/log"
)
Expand All @@ -23,8 +21,7 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask)
log.Panic("only one task is allowed")
}
task := tasks[0]
ctx := context.Background()
needScan, _ := broker.checkNeedScan(ctx, task)
needScan, _ := broker.checkNeedScan(task)
if !needScan {
task.handle()
return false
Expand Down
1 change: 0 additions & 1 deletion pkg/messaging/message_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,5 +361,4 @@ func (s *grpcServer) handleConnect(msg *proto.Message, stream grpcSender, isEven
} else {
return remoteTarget.runCommandSendStream(stream)
}

}
Loading

0 comments on commit 9538b85

Please sign in to comment.