Skip to content

Commit

Permalink
dispatcher: Support UpdateSchemas for Rename Table and Rename Tables (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Oct 9, 2024
1 parent 579ea47 commit 45240db
Show file tree
Hide file tree
Showing 6 changed files with 501 additions and 154 deletions.
49 changes: 36 additions & 13 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,27 +84,29 @@ type Dispatcher struct {
resendTask *ResendTask
checkTableProgressEmptyTask *CheckProgressEmptyTask

schemaID int64
schemaIDToDispatchers *SchemaIDToDispatchers
schemaID int64

// only exist when the dispatcher is a table trigger event dispatcher
tableNameStore *TableNameStore
}

func NewDispatcher(id common.DispatcherID, tableSpan *heartbeatpb.TableSpan, sink tisink.Sink, startTs uint64, statusesChan chan *heartbeatpb.TableSpanStatus, filter filter.Filter, schemaID int64) *Dispatcher {
func NewDispatcher(id common.DispatcherID, tableSpan *heartbeatpb.TableSpan, sink tisink.Sink, startTs uint64, statusesChan chan *heartbeatpb.TableSpanStatus, filter filter.Filter, schemaID int64, schemaIDToDispatchers *SchemaIDToDispatchers) *Dispatcher {
dispatcher := &Dispatcher{
id: id,
tableSpan: tableSpan,
sink: sink,
statusesChan: statusesChan,
//SyncPointInfo: syncPointInfo,
//MemoryUsage: NewMemoryUsage(),
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
resolvedTs: newTsWithMutex(startTs),
filter: filter,
isRemoving: atomic.Bool{},
ddlPendingEvent: nil,
tableProgress: types.NewTableProgress(),
schemaID: schemaID,
componentStatus: newComponentStateWithMutex(heartbeatpb.ComponentState_Working),
resolvedTs: newTsWithMutex(startTs),
filter: filter,
isRemoving: atomic.Bool{},
ddlPendingEvent: nil,
tableProgress: types.NewTableProgress(),
schemaID: schemaID,
schemaIDToDispatchers: schemaIDToDispatchers,
}

// only when is not mysql sink, table trigger event dispatcher need tableNameStore to store the table name
Expand Down Expand Up @@ -249,11 +251,36 @@ func (d *Dispatcher) DealWithDDLWhenProgressEmpty() {
BlockTables: d.ddlPendingEvent.GetBlockedTables().ToPB(),
NeedDroppedTables: d.ddlPendingEvent.GetNeedDroppedTables().ToPB(),
NeedAddedTables: common.ToTablesPB(d.ddlPendingEvent.GetNeedAddedTables()),
UpdatedSchemas: common.ToSchemaIDChangePB(d.ddlPendingEvent.GetUpdatedSchemas()), // only exists for rename table and rename tables
},
}
d.SetResendTask(newResendTask(message, d))
d.statusesChan <- message
}

// dealing with events which update schema ids
// Only rename table and rename tables may update schema ids(rename db1.table1 to db2.table2)
// Here we directly update schema id of dispatcher when we begin to handle the ddl event,
// but not waiting maintainer response for ready to write/pass the ddl event.
// Because the schemaID of each dispatcher is only use to dealing with the db-level ddl event(like drop db) or drop table.
// Both the rename table/rename tables, drop table and db-level ddl event will be send to the table trigger event dispatcher in order.
// So there won't be a related db-level ddl event is in dealing when we get update schema id events.
// Thus, whether to update schema id before or after current ddl event is not important.
// To make it easier, we choose to directly update schema id here.
if d.ddlPendingEvent.GetUpdatedSchemas() != nil && d.tableSpan != heartbeatpb.DDLSpan {
for _, schemaIDChange := range d.ddlPendingEvent.GetUpdatedSchemas() {
if schemaIDChange.TableID == d.tableSpan.TableID {
if schemaIDChange.OldSchemaID != d.schemaID {
log.Error("Wrong Schema ID", zap.Any("dispatcherID", d.id), zap.Any("except schemaID", schemaIDChange.OldSchemaID), zap.Any("actual schemaID", d.schemaID), zap.Any("tableSpan", d.tableSpan.String()))
return
} else {
d.schemaID = schemaIDChange.NewSchemaID
d.schemaIDToDispatchers.Update(schemaIDChange.OldSchemaID, schemaIDChange.NewSchemaID)
return
}
}
}
}
}

func (d *Dispatcher) GetTableSpan() *heartbeatpb.TableSpan {
Expand Down Expand Up @@ -306,10 +333,6 @@ func (d *Dispatcher) GetSchemaID() int64 {
// return d.syncPointInfo
// }

// func (d *Dispatcher) GetMemoryUsage() *MemoryUsage {
// return d.MemoryUsage
// }

func (d *Dispatcher) Remove() {
// TODO: 修改这个 dispatcher 的 status 为 removing
log.Info("table event dispatcher component status changed to stopping", zap.String("table", d.tableSpan.String()))
Expand Down
56 changes: 55 additions & 1 deletion downstreamadapter/dispatcher/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,62 @@ import (
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/utils/dynstream"
"github.com/flowbehappy/tigate/utils/threadpool"
"github.com/pingcap/log"
"go.uber.org/zap"
)

type SchemaIDToDispatchers struct {
mutex sync.RWMutex
m map[int64]map[common.DispatcherID]interface{}
}

func NewSchemaIDToDispatchers() *SchemaIDToDispatchers {
return &SchemaIDToDispatchers{
m: make(map[int64]map[common.DispatcherID]interface{}),
}
}

func (s *SchemaIDToDispatchers) Set(schemaID int64, dispatcherID common.DispatcherID) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[schemaID]; !ok {
s.m[schemaID] = make(map[common.DispatcherID]interface{})
}
s.m[schemaID][dispatcherID] = struct{}{}
}

func (s *SchemaIDToDispatchers) Delete(schemaID int64, dispatcherID common.DispatcherID) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[schemaID]; ok {
delete(s.m[schemaID], dispatcherID)
}
}

func (s *SchemaIDToDispatchers) Update(oldSchemaID int64, newSchemaID int64) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[oldSchemaID]; ok {
s.m[newSchemaID] = s.m[oldSchemaID]
delete(s.m, oldSchemaID)
} else {
log.Error("schemaID not found", zap.Any("schemaID", oldSchemaID))
}
}

func (s *SchemaIDToDispatchers) GetDispatcherIDs(schemaID int64) []common.DispatcherID {
s.mutex.RLock()
defer s.mutex.RUnlock()
if ids, ok := s.m[schemaID]; ok {
dispatcherIDs := make([]common.DispatcherID, 0, len(ids))
for id := range ids {
dispatcherIDs = append(dispatcherIDs, id)
}
return dispatcherIDs
}
return nil
}

type SyncPointInfo struct {
EnableSyncPoint bool
SyncPointInterval time.Duration
Expand Down Expand Up @@ -179,7 +233,7 @@ func newResendTask(message *heartbeatpb.TableSpanStatus, dispatcher *Dispatcher)

func (t *ResendTask) Execute() time.Time {
t.dispatcher.GetStatusesChan() <- t.message
return time.Now().Add(50 * time.Millisecond)
return time.Now().Add(200 * time.Millisecond)
}

func (t *ResendTask) Cancel() {
Expand Down
53 changes: 4 additions & 49 deletions downstreamadapter/dispatchermanager/event_dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type EventDispatcherManager struct {

heartBeatTask *HeartBeatTask

schemaIDToDispatchers *SchemaIDToDispatchers
schemaIDToDispatchers *dispatcher.SchemaIDToDispatchers

tableTriggerEventDispatcher *dispatcher.Dispatcher

Expand All @@ -99,7 +99,7 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID,
statusesChan: make(chan *heartbeatpb.TableSpanStatus, 10000),
cancel: cancel,
config: cfConfig,
schemaIDToDispatchers: newSchemaIDToDispatchers(),
schemaIDToDispatchers: dispatcher.NewSchemaIDToDispatchers(),
tableEventDispatcherCount: metrics.TableEventDispatcherGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCreateDispatcherDuration: metrics.CreateDispatcherDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTs: metrics.EventDispatcherManagerCheckpointTsGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
Expand Down Expand Up @@ -205,7 +205,7 @@ func (e *EventDispatcherManager) NewDispatcher(id common.DispatcherID, tableSpan
return nil
}

dispatcher := dispatcher.NewDispatcher(id, tableSpan, e.sink, startTs, e.statusesChan, e.filter, schemaID)
dispatcher := dispatcher.NewDispatcher(id, tableSpan, e.sink, startTs, e.statusesChan, e.filter, schemaID, e.schemaIDToDispatchers)

if tableSpan.Equal(heartbeatpb.DDLSpan) {
e.tableTriggerEventDispatcher = dispatcher
Expand Down Expand Up @@ -397,10 +397,6 @@ func (e *EventDispatcherManager) GetDispatcherMap() *DispatcherMap {
return e.dispatcherMap
}

func (e *EventDispatcherManager) GetSchemaIDToDispatchers() *SchemaIDToDispatchers {
return e.schemaIDToDispatchers
}

func (e *EventDispatcherManager) GetMaintainerID() node.ID {
return e.maintainerID
}
Expand All @@ -427,7 +423,7 @@ func (e *EventDispatcherManager) SetMaintainerID(maintainerID node.ID) {

// Get all dispatchers id of the specified schemaID. Including the tableTriggerEventDispatcherID if exists.
func (e *EventDispatcherManager) GetAllDispatchers(schemaID int64) []common.DispatcherID {
dispatcherIDs := e.GetSchemaIDToDispatchers().GetDispatcherIDs(schemaID)
dispatcherIDs := e.schemaIDToDispatchers.GetDispatcherIDs(schemaID)
if e.tableTriggerEventDispatcher != nil {
dispatcherIDs = append(dispatcherIDs, e.tableTriggerEventDispatcher.GetId())
}
Expand Down Expand Up @@ -475,44 +471,3 @@ func (d *DispatcherMap) ForEach(fn func(id common.DispatcherID, dispatcher *disp
return true
})
}

type SchemaIDToDispatchers struct {
mutex sync.RWMutex
m map[int64]map[common.DispatcherID]interface{}
}

func newSchemaIDToDispatchers() *SchemaIDToDispatchers {
return &SchemaIDToDispatchers{
m: make(map[int64]map[common.DispatcherID]interface{}),
}
}

func (s *SchemaIDToDispatchers) Set(schemaID int64, dispatcherID common.DispatcherID) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[schemaID]; !ok {
s.m[schemaID] = make(map[common.DispatcherID]interface{})
}
s.m[schemaID][dispatcherID] = struct{}{}
}

func (s *SchemaIDToDispatchers) Delete(schemaID int64, dispatcherID common.DispatcherID) {
s.mutex.Lock()
defer s.mutex.Unlock()
if _, ok := s.m[schemaID]; ok {
delete(s.m[schemaID], dispatcherID)
}
}

func (s *SchemaIDToDispatchers) GetDispatcherIDs(schemaID int64) []common.DispatcherID {
s.mutex.RLock()
defer s.mutex.RUnlock()
if ids, ok := s.m[schemaID]; ok {
dispatcherIDs := make([]common.DispatcherID, 0, len(ids))
for id := range ids {
dispatcherIDs = append(dispatcherIDs, id)
}
return dispatcherIDs
}
return nil
}
Loading

0 comments on commit 45240db

Please sign in to comment.