diff --git a/logservice/schemastore/ddl_job_fetcher.go b/logservice/schemastore/ddl_job_fetcher.go index 5b314291e..f89c3a71d 100644 --- a/logservice/schemastore/ddl_job_fetcher.go +++ b/logservice/schemastore/ddl_job_fetcher.go @@ -22,6 +22,7 @@ import ( "github.com/flowbehappy/tigate/pkg/common" "github.com/flowbehappy/tigate/pkg/mounter" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser/model" @@ -31,6 +32,7 @@ import ( "github.com/pingcap/tiflow/pkg/security" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" + "go.uber.org/zap" ) type ddlJobFetcher struct { @@ -56,7 +58,7 @@ func newDDLJobFetcher( ) *ddlJobFetcher { clientConfig := &logpuller.SubscriptionClientConfig{ RegionRequestWorkerPerStore: 1, - ChangeEventProcessorNum: 4, + ChangeEventProcessorNum: 1, // must be 1, because ddlJobFetcher.input cannot be called concurrently AdvanceResolvedTsIntervalInMs: 100, } client := logpuller.NewSubscriptionClient( @@ -120,8 +122,10 @@ func (p *ddlJobFetcher) unmarshalDDL(rawKV *common.RawKVEntry) (*model.Job, erro return nil, nil } if p.ddlTableInfo == nil && !mounter.IsLegacyFormatJob(rawKV) { + log.Info("begin to init ddl table info") err := p.initDDLTableInfo() if err != nil { + log.Error("init ddl table info failed", zap.Error(err)) return nil, errors.Trace(err) } } diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index 9a7129376..e79ff5118 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -183,8 +183,13 @@ func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*Basic return databaseMap, nil } -func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int64]*BasicDatabaseInfo) (map[int64]*BasicTableInfo, error) { +func loadTablesInKVSnap( + snap *pebble.Snapshot, + gcTs uint64, + databaseMap map[int64]*BasicDatabaseInfo, +) (map[int64]*BasicTableInfo, map[int64]BasicPartitionInfo, error) { tablesInKVSnap := make(map[int64]*BasicTableInfo) + partitionsInKVSnap := make(map[int64]BasicPartitionInfo) startKey, err := tableInfoKey(gcTs, 0) if err != nil { @@ -208,26 +213,32 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6 log.Fatal("unmarshal table info entry failed", zap.Error(err)) } - tbNameInfo := model.TableNameInfo{} - if err := json.Unmarshal(table_info_entry.TableInfoValue, &tbNameInfo); err != nil { - log.Fatal("unmarshal table name info failed", zap.Error(err)) + tableInfo := model.TableInfo{} + if err := json.Unmarshal(table_info_entry.TableInfoValue, &tableInfo); err != nil { + log.Fatal("unmarshal table info failed", zap.Error(err)) } databaseInfo, ok := databaseMap[table_info_entry.SchemaID] if !ok { log.Panic("database not found", zap.Int64("schemaID", table_info_entry.SchemaID), zap.String("schemaName", table_info_entry.SchemaName), - zap.String("tableName", tbNameInfo.Name.O)) + zap.String("tableName", tableInfo.Name.O)) } // TODO: add a unit test for this case - databaseInfo.Tables[tbNameInfo.ID] = true - tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{ + databaseInfo.Tables[tableInfo.ID] = true + tablesInKVSnap[tableInfo.ID] = &BasicTableInfo{ SchemaID: table_info_entry.SchemaID, - Name: tbNameInfo.Name.O, + Name: tableInfo.Name.O, + } + if tableInfo.Partition != nil { + partitionInfo := make(BasicPartitionInfo) + for _, partition := range tableInfo.Partition.Definitions { + partitionInfo[partition.ID] = nil + } + partitionsInKVSnap[tableInfo.ID] = partitionInfo } } - - return tablesInKVSnap, nil + return tablesInKVSnap, partitionsInKVSnap, nil } // load the ddl jobs in the range (gcTs, upperBound] and apply the ddl job to update database and table info @@ -237,6 +248,7 @@ func loadAndApplyDDLHistory( maxFinishedDDLTs uint64, databaseMap map[int64]*BasicDatabaseInfo, tableMap map[int64]*BasicTableInfo, + partitionMap map[int64]BasicPartitionInfo, ) (map[int64][]uint64, []uint64, error) { tablesDDLHistory := make(map[int64][]uint64) tableTriggerDDLHistory := make([]uint64, 0) @@ -279,7 +291,7 @@ func loadAndApplyDDLHistory( tableTriggerDDLHistory); err != nil { log.Panic("updateDDLHistory error", zap.Error(err)) } - if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap); err != nil { + if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil { log.Panic("updateDatabaseInfo error", zap.Error(err)) } } @@ -333,6 +345,17 @@ func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEv if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil { log.Fatal("unmarshal table info failed", zap.Error(err)) } + ddlEvent.TableInfoValue = nil + + if len(ddlEvent.MultipleTableInfosValue) > 0 { + ddlEvent.MultipleTableInfos = make([]*model.TableInfo, len(ddlEvent.MultipleTableInfosValue)) + for i := range ddlEvent.MultipleTableInfosValue { + if err := json.Unmarshal(ddlEvent.MultipleTableInfosValue[i], &ddlEvent.MultipleTableInfos[i]); err != nil { + log.Fatal("unmarshal multi table info failed", zap.Error(err)) + } + } + } + ddlEvent.MultipleTableInfosValue = nil return ddlEvent } @@ -346,6 +369,15 @@ func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error { if err != nil { return err } + if len(ddlEvent.MultipleTableInfos) > 0 { + ddlEvent.MultipleTableInfosValue = make([][]byte, len(ddlEvent.MultipleTableInfos)) + for i := range ddlEvent.MultipleTableInfos { + ddlEvent.MultipleTableInfosValue[i], err = json.Marshal(ddlEvent.MultipleTableInfos[i]) + if err != nil { + return err + } + } + } ddlValue, err := ddlEvent.MarshalMsg(nil) if err != nil { return err @@ -526,12 +558,13 @@ func loadAllPhysicalTablesAtTs( return nil, err } - tableMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap) + tableMap, partitionMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap) if err != nil { return nil, err } log.Info("after load tables in kv snap", - zap.Int("tableMapLen", len(tableMap))) + zap.Int("tableMapLen", len(tableMap)), + zap.Int("partitionMapLen", len(partitionMap))) // apply ddl jobs in range (gcTs, snapVersion] startKey, err := ddlJobKey(gcTs + 1) @@ -559,12 +592,13 @@ func loadAllPhysicalTablesAtTs( if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil { log.Fatal("unmarshal table info failed", zap.Error(err)) } - if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap); err != nil { + if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil { log.Panic("updateDatabaseInfo error", zap.Error(err)) } } log.Info("after load tables from ddl", - zap.Int("tableMapLen", len(tableMap))) + zap.Int("tableMapLen", len(tableMap)), + zap.Int("partitionMapLen", len(partitionMap))) tables := make([]common.Table, 0) for tableID, tableInfo := range tableMap { if _, ok := databaseMap[tableInfo.SchemaID]; !ok { @@ -577,10 +611,19 @@ func loadAllPhysicalTablesAtTs( if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) { continue } - tables = append(tables, common.Table{ - SchemaID: tableInfo.SchemaID, - TableID: tableID, - }) + if partitionInfo, ok := partitionMap[tableID]; ok { + for partitionID := range partitionInfo { + tables = append(tables, common.Table{ + SchemaID: tableInfo.SchemaID, + TableID: partitionID, + }) + } + } else { + tables = append(tables, common.Table{ + SchemaID: tableInfo.SchemaID, + TableID: tableID, + }) + } } return tables, nil } diff --git a/logservice/schemastore/multi_version.go b/logservice/schemastore/multi_version.go index 10d9ffb72..7895c3a25 100644 --- a/logservice/schemastore/multi_version.go +++ b/logservice/schemastore/multi_version.go @@ -84,7 +84,7 @@ func (v *versionedTableInfoStore) setTableInfoInitialized() { // zap.String("query", job.Query), // zap.Uint64("finishedTS", job.BinlogInfo.FinishedTS), // zap.Any("infosLen", len(v.infos))) - v.doApplyDDL(job) + v.doApplyDDL(&job) } v.initialized = true close(v.readyToRead) @@ -155,7 +155,7 @@ func (v *versionedTableInfoStore) gc(gcTs uint64) bool { return false } -func assertEmpty(infos []*tableInfoItem, event PersistedDDLEvent) { +func assertEmpty(infos []*tableInfoItem, event *PersistedDDLEvent) { if len(infos) != 0 { log.Panic("shouldn't happen", zap.Any("infosLen", len(infos)), @@ -168,7 +168,7 @@ func assertEmpty(infos []*tableInfoItem, event PersistedDDLEvent) { } } -func assertNonEmpty(infos []*tableInfoItem, event PersistedDDLEvent) { +func assertNonEmpty(infos []*tableInfoItem, event *PersistedDDLEvent) { if len(infos) == 0 { log.Panic("shouldn't happen", zap.Any("infos", infos), @@ -182,7 +182,7 @@ func assertNonDeleted(v *versionedTableInfoStore) { } } -func (v *versionedTableInfoStore) applyDDLFromPersistStorage(event PersistedDDLEvent) { +func (v *versionedTableInfoStore) applyDDLFromPersistStorage(event *PersistedDDLEvent) { v.mu.Lock() defer v.mu.Unlock() if v.initialized { @@ -192,21 +192,22 @@ func (v *versionedTableInfoStore) applyDDLFromPersistStorage(event PersistedDDLE v.doApplyDDL(event) } -func (v *versionedTableInfoStore) applyDDL(event PersistedDDLEvent) { +func (v *versionedTableInfoStore) applyDDL(event *PersistedDDLEvent) { v.mu.Lock() defer v.mu.Unlock() // delete table should not receive more ddl assertNonDeleted(v) if !v.initialized { - v.pendingDDLs = append(v.pendingDDLs, event) + // The usage of the parameter `event` may outlive the function call, so we copy it. + v.pendingDDLs = append(v.pendingDDLs, *event) return } v.doApplyDDL(event) } // lock must be hold by the caller -func (v *versionedTableInfoStore) doApplyDDL(event PersistedDDLEvent) { +func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { // TODO: add a unit test // TODO: whether need add schema version check if len(v.infos) != 0 && event.FinishedTs <= v.infos[len(v.infos)-1].version { @@ -217,6 +218,11 @@ func (v *versionedTableInfoStore) doApplyDDL(event PersistedDDLEvent) { zap.Int("infosLen", len(v.infos))) return } + appendTableInfo := func() { + info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo) + info.InitPreSQLs() + v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) + } switch model.ActionType(event.Type) { case model.ActionCreateTable: @@ -229,30 +235,125 @@ func (v *versionedTableInfoStore) doApplyDDL(event PersistedDDLEvent) { break } assertEmpty(v.infos, event) - info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo) - info.InitPreSQLs() - v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) - case model.ActionRenameTable, - model.ActionAddColumn, - model.ActionDropColumn: - assertNonEmpty(v.infos, event) - info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo) - info.InitPreSQLs() - v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) + appendTableInfo() case model.ActionDropTable: v.deleteVersion = uint64(event.FinishedTs) + case model.ActionAddColumn, + model.ActionDropColumn: + assertNonEmpty(v.infos, event) + appendTableInfo() case model.ActionTruncateTable: - if v.tableID == event.CurrentTableID { - info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, event.TableInfo) + if isPartitionTableEvent(event) { + createTable := false + for _, partition := range getAllPartitionIDs(event) { + if v.tableID == partition { + createTable = true + break + } + } + if createTable { + log.Info("create table for truncate table") + appendTableInfo() + } else { + v.deleteVersion = uint64(event.FinishedTs) + } + } else { + if v.tableID == event.CurrentTableID { + appendTableInfo() + } else { + if v.tableID != event.PrevTableID { + log.Panic("should not happen") + } + v.deleteVersion = uint64(event.FinishedTs) + } + } + case model.ActionRenameTable: + assertNonEmpty(v.infos, event) + appendTableInfo() + case model.ActionAddTablePartition: + newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + for _, partition := range newCreatedIDs { + if v.tableID == partition { + appendTableInfo() + break + } + } + case model.ActionDropTablePartition: + droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + for _, partition := range droppedIDs { + if v.tableID == partition { + v.deleteVersion = uint64(event.FinishedTs) + break + } + } + case model.ActionTruncateTablePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + dropped := false + for _, partition := range droppedIDs { + if v.tableID == partition { + v.deleteVersion = uint64(event.FinishedTs) + dropped = true + break + } + } + if !dropped { + newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) + for _, partition := range newCreatedIDs { + if v.tableID == partition { + appendTableInfo() + break + } + } + } + case model.ActionExchangeTablePartition: + assertNonEmpty(v.infos, event) + lastRawTableInfo := v.infos[len(v.infos)-1].info.TableInfo.Clone() + // the previous normal table + if v.tableID == event.PrevTableID { + lastRawTableInfo.Name = model.NewCIStr(event.CurrentTableName) + info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, lastRawTableInfo) info.InitPreSQLs() v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) } else { - if v.tableID != event.PrevTableID { - log.Panic("should not happen") + lastRawTableInfo.Name = model.NewCIStr(event.PrevTableName) + info := common.WrapTableInfo(event.PrevSchemaID, event.PrevSchemaName, event.FinishedTs, lastRawTableInfo) + info.InitPreSQLs() + v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) + } + case model.ActionCreateTables: + assertEmpty(v.infos, event) + for _, tableInfo := range event.MultipleTableInfos { + if v.tableID == tableInfo.ID { + info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo) + info.InitPreSQLs() + v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) + break + } + } + case model.ActionReorganizePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + dropped := false + for _, partition := range droppedIDs { + if v.tableID == partition { + v.deleteVersion = uint64(event.FinishedTs) + dropped = true + break + } + } + if !dropped { + newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) + for _, partition := range newCreatedIDs { + if v.tableID == partition { + appendTableInfo() + break + } } - v.deleteVersion = uint64(event.FinishedTs) } default: - // TODO: idenitify unexpected ddl or specify all expected ddl + log.Panic("not supported ddl type", + zap.Any("ddlType", event.Type), + zap.String("DDL", event.Query)) } } diff --git a/logservice/schemastore/multi_version_test.go b/logservice/schemastore/multi_version_test.go index d93a58ed8..88deac467 100644 --- a/logservice/schemastore/multi_version_test.go +++ b/logservice/schemastore/multi_version_test.go @@ -27,7 +27,7 @@ func TestCreateTruncateAndDropTable(t *testing.T) { store1.setTableInfoInitialized() createVersion := uint64(300) { - createDDLJob := PersistedDDLEvent{ + createDDLEvent := &PersistedDDLEvent{ Type: byte(model.ActionCreateTable), CurrentSchemaID: 10, CurrentTableID: tableID1, @@ -39,7 +39,7 @@ func TestCreateTruncateAndDropTable(t *testing.T) { }, FinishedTs: createVersion, } - store1.applyDDL(createDDLJob) + store1.applyDDL(createDDLEvent) } tableID2 := tableID1 + 1 @@ -47,7 +47,7 @@ func TestCreateTruncateAndDropTable(t *testing.T) { store2.setTableInfoInitialized() truncateVersion := createVersion + 10 { - truncateDDLJob := PersistedDDLEvent{ + truncateDDLEvent := &PersistedDDLEvent{ Type: byte(model.ActionTruncateTable), CurrentSchemaID: 10, CurrentTableID: tableID2, @@ -60,13 +60,13 @@ func TestCreateTruncateAndDropTable(t *testing.T) { }, FinishedTs: truncateVersion, } - store1.applyDDL(truncateDDLJob) - store2.applyDDL(truncateDDLJob) + store1.applyDDL(truncateDDLEvent) + store2.applyDDL(truncateDDLEvent) } dropVersion := truncateVersion + 10 { - dropDDLJob := PersistedDDLEvent{ + dropDDLEvent := &PersistedDDLEvent{ Type: byte(model.ActionDropTable), CurrentSchemaID: 10, CurrentTableID: tableID2, @@ -78,7 +78,7 @@ func TestCreateTruncateAndDropTable(t *testing.T) { }, FinishedTs: dropVersion, } - store2.applyDDL(dropDDLJob) + store2.applyDDL(dropDDLEvent) } { @@ -106,7 +106,7 @@ func TestRenameTable(t *testing.T) { createVersion := uint64(100) schemaID1 := int64(10) { - createDDLJob := PersistedDDLEvent{ + createDDLEvent := &PersistedDDLEvent{ Type: byte(model.ActionCreateTable), CurrentSchemaID: 10, CurrentTableID: tableID, @@ -118,13 +118,13 @@ func TestRenameTable(t *testing.T) { }, FinishedTs: createVersion, } - store.applyDDL(createDDLJob) + store.applyDDL(createDDLEvent) } renameVersion := createVersion + 10 schemaID2 := schemaID1 + 100 { - renameDDLJob := PersistedDDLEvent{ + renameDDLEvent := &PersistedDDLEvent{ Type: byte(model.ActionRenameTable), CurrentSchemaID: schemaID2, CurrentTableID: tableID, @@ -139,7 +139,7 @@ func TestRenameTable(t *testing.T) { }, FinishedTs: renameVersion, } - store.applyDDL(renameDDLJob) + store.applyDDL(renameDDLEvent) } { diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index 3c6dea78d..ef750ee89 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -18,6 +18,7 @@ import ( "fmt" "os" "sort" + "strings" "sync" "time" @@ -57,6 +58,8 @@ type persistentStorage struct { tableMap map[int64]*BasicTableInfo + partitionMap map[int64]BasicPartitionInfo + // schemaID -> database info // it contains all databases and deleted databases // will only be removed when its delete version is smaller than gc ts @@ -199,7 +202,7 @@ func (p *persistentStorage) initializeFromDisk() { log.Fatal("load database info from disk failed") } - if p.tableMap, err = loadTablesInKVSnap(storageSnap, p.gcTs, p.databaseMap); err != nil { + if p.tableMap, p.partitionMap, err = loadTablesInKVSnap(storageSnap, p.gcTs, p.databaseMap); err != nil { log.Fatal("load tables in kv snapshot failed") } @@ -208,7 +211,8 @@ func (p *persistentStorage) initializeFromDisk() { p.gcTs, p.upperBound.FinishedDDLTs, p.databaseMap, - p.tableMap); err != nil { + p.tableMap, + p.partitionMap); err != nil { log.Fatal("fail to initialize from disk") } } @@ -292,25 +296,26 @@ func (p *persistentStorage) fetchTableDDLEvents(tableID int64, tableFilter filte // TODO: check a dispatcher from rename table start ts > finish ts of rename table(is it possible?) p.mu.RLock() if start < p.gcTs { - p.mu.Unlock() + p.mu.RUnlock() return nil, fmt.Errorf("startTs %d is smaller than gcTs %d", start, p.gcTs) } // fast check - if len(p.tablesDDLHistory[tableID]) == 0 || start >= p.tablesDDLHistory[tableID][len(p.tablesDDLHistory[tableID])-1] { + history := p.tablesDDLHistory[tableID] + if len(history) == 0 || start >= history[len(history)-1] { p.mu.RUnlock() return nil, nil } - index := sort.Search(len(p.tablesDDLHistory[tableID]), func(i int) bool { - return p.tablesDDLHistory[tableID][i] > start + index := sort.Search(len(history), func(i int) bool { + return history[i] > start }) - if index == len(p.tablesDDLHistory[tableID]) { + if index == len(history) { log.Panic("should not happen") } // copy all target ts to a new slice allTargetTs := make([]uint64, 0) - for i := index; i < len(p.tablesDDLHistory[tableID]); i++ { - if p.tablesDDLHistory[tableID][i] <= end { - allTargetTs = append(allTargetTs, p.tablesDDLHistory[tableID][i]) + for i := index; i < len(history); i++ { + if history[i] <= end { + allTargetTs = append(allTargetTs, history[i]) } } @@ -387,10 +392,24 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter } for _, ts := range allTargetTs { rawEvent := readPersistedDDLEvent(storageSnap, ts) - if tableFilter != nil && - tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) && - tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName) { - continue + if tableFilter != nil { + if rawEvent.Type == byte(model.ActionCreateTables) { + allFiltered := true + for _, tableInfo := range rawEvent.MultipleTableInfos { + if !tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, tableInfo.Name.O) { + allFiltered = false + break + } + } + if allFiltered { + continue + } + } else { + if tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) && + tableFilter.ShouldDiscardDDL(model.ActionType(rawEvent.Type), rawEvent.PrevSchemaName, rawEvent.PrevTableName) { + continue + } + } } events = append(events, buildDDLEvent(&rawEvent, tableFilter)) } @@ -421,7 +440,7 @@ func (p *persistentStorage) buildVersionedTableInfoStore( for _, version := range allDDLFinishedTs { ddlEvent := readPersistedDDLEvent(storageSnap, version) - store.applyDDLFromPersistStorage(ddlEvent) + store.applyDDLFromPersistStorage(&ddlEvent) } store.setTableInfoInitialized() return nil @@ -576,8 +595,7 @@ func (p *persistentStorage) persistUpperBoundPeriodically(ctx context.Context) e func (p *persistentStorage) handleDDLJob(job *model.Job) error { p.mu.Lock() - ddlEvent := buildPersistedDDLEventFromJob(job, p.databaseMap, p.tableMap) - // TODO: and some comment to explain why we need skik ddl here and why it is real rare + ddlEvent := buildPersistedDDLEventFromJob(job, p.databaseMap, p.tableMap, p.partitionMap) if shouldSkipDDL(&ddlEvent, p.databaseMap, p.tableMap) { p.mu.Unlock() return nil @@ -588,12 +606,18 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { zap.Int64("schemaID", ddlEvent.CurrentSchemaID), zap.Int64("tableID", ddlEvent.CurrentTableID), zap.Uint64("finishedTs", ddlEvent.FinishedTs), + zap.String("ddlType", model.ActionType(ddlEvent.Type).String()), zap.String("query", ddlEvent.Query)) + // Note: need write ddl event to disk before update ddl history, + // becuase other goroutines may read ddl events from disk according to ddl history writePersistedDDLEvent(p.db, &ddlEvent) p.mu.Lock() var err error + // Note: `updateDDLHistory` must be before `updateDatabaseInfoAndTableInfo`, + // because `updateDDLHistory` will refer to the info in databaseMap and tableMap, + // and `updateDatabaseInfoAndTableInfo` may delete some info from databaseMap and tableMap if p.tableTriggerDDLHistory, err = updateDDLHistory( &ddlEvent, p.databaseMap, @@ -603,11 +627,11 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { p.mu.Unlock() return err } - if err := updateDatabaseInfoAndTableInfo(&ddlEvent, p.databaseMap, p.tableMap); err != nil { + if err := updateDatabaseInfoAndTableInfo(&ddlEvent, p.databaseMap, p.tableMap, p.partitionMap); err != nil { p.mu.Unlock() return err } - if err := updateRegisteredTableInfoStore(ddlEvent, p.tableInfoStoreMap); err != nil { + if err := updateRegisteredTableInfoStore(&ddlEvent, p.tableInfoStoreMap); err != nil { p.mu.Unlock() return err } @@ -619,6 +643,7 @@ func buildPersistedDDLEventFromJob( job *model.Job, databaseMap map[int64]*BasicDatabaseInfo, tableMap map[int64]*BasicTableInfo, + partitionMap map[int64]BasicPartitionInfo, ) PersistedDDLEvent { getSchemaName := func(schemaID int64) string { databaseInfo, ok := databaseMap[schemaID] @@ -676,33 +701,75 @@ func buildPersistedDDLEventFromJob( model.ActionAddIndex, model.ActionDropIndex, model.ActionAddForeignKey, - model.ActionDropForeignKey, - model.ActionModifyColumn, - model.ActionRebaseAutoID, - model.ActionSetDefaultValue, - model.ActionShardRowID, - model.ActionModifyTableComment, - model.ActionRenameIndex: + model.ActionDropForeignKey: event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) event.CurrentTableName = getTableName(event.CurrentTableID) case model.ActionTruncateTable: - // only table id change + // only table id change after truncate event.PrevTableID = event.CurrentTableID event.CurrentTableID = event.TableInfo.ID event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) event.CurrentTableName = getTableName(event.PrevTableID) + if isPartitionTableEvent(&event) { + for id := range partitionMap[event.PrevTableID] { + event.PrevPartitions = append(event.PrevPartitions, id) + } + } + case model.ActionModifyColumn, + model.ActionRebaseAutoID: + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.CurrentTableName = getTableName(event.CurrentTableID) case model.ActionRenameTable: - // TODO: check the following fields is set correctly - // schema id/schema name/table name may be changed + // Note: schema id/schema name/table name may be changed or not + // table id does not change, we use it to get the table's prev schema id/name and table name event.PrevSchemaID = getSchemaID(event.CurrentTableID) event.PrevSchemaName = getSchemaName(event.PrevSchemaID) event.PrevTableName = getTableName(event.CurrentTableID) + // get the table's current schema name and table name from the ddl job event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) event.CurrentTableName = event.TableInfo.Name.O + case model.ActionSetDefaultValue, + model.ActionShardRowID, + model.ActionModifyTableComment, + model.ActionRenameIndex: + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.CurrentTableName = getTableName(event.CurrentTableID) + case model.ActionAddTablePartition, + model.ActionDropTablePartition: + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.CurrentTableName = getTableName(event.CurrentTableID) + for id := range partitionMap[event.CurrentTableID] { + event.PrevPartitions = append(event.PrevPartitions, id) + } case model.ActionCreateView: // ignore + case model.ActionTruncateTablePartition: + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.CurrentTableName = getTableName(event.CurrentTableID) + for id := range partitionMap[event.CurrentTableID] { + event.PrevPartitions = append(event.PrevPartitions, id) + } + case model.ActionExchangeTablePartition: + event.PrevSchemaID = event.CurrentSchemaID + event.PrevTableID = event.CurrentTableID + event.PrevSchemaName = getSchemaName(event.PrevSchemaID) + event.PrevTableName = getTableName(event.PrevTableID) + event.CurrentTableID = event.TableInfo.ID + event.CurrentSchemaID = getSchemaID(event.CurrentTableID) + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.CurrentTableName = getTableName(event.CurrentTableID) + for id := range partitionMap[event.CurrentTableID] { + event.PrevPartitions = append(event.PrevPartitions, id) + } case model.ActionCreateTables: - // FIXME: support create tables + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.MultipleTableInfos = job.BinlogInfo.MultipleTableInfos + case model.ActionReorganizePartition: + event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) + event.CurrentTableName = getTableName(event.CurrentTableID) + for id := range partitionMap[event.CurrentTableID] { + event.PrevPartitions = append(event.PrevPartitions, id) + } default: log.Panic("unknown ddl type", zap.Any("ddlType", event.Type), @@ -711,13 +778,13 @@ func buildPersistedDDLEventFromJob( return event } -// TODO: add some comment to explain why we should skip some ddl func shouldSkipDDL( event *PersistedDDLEvent, databaseMap map[int64]*BasicDatabaseInfo, tableMap map[int64]*BasicTableInfo, ) bool { switch model.ActionType(event.Type) { + // TODO: add some comment to explain why and when we should skip ActionCreateSchema/ActionCreateTable case model.ActionCreateSchema: if _, ok := databaseMap[event.CurrentSchemaID]; ok { log.Warn("database already exists. ignore DDL ", @@ -729,6 +796,7 @@ func shouldSkipDDL( return true } case model.ActionCreateTable: + // Note: partition table's logical table id is also in tableMap if _, ok := tableMap[event.CurrentTableID]; ok { log.Warn("table already exists. ignore DDL ", zap.String("DDL", event.Query), @@ -739,10 +807,28 @@ func shouldSkipDDL( zap.Int64("jobSchemaVersion", event.SchemaVersion)) return true } + case model.ActionAlterTableAttributes, + model.ActionAlterTablePartitionAttributes: + // Note: these ddls seems not useful to sync to downstream? + return true } + // Note: create tables don't need to be ignore, because we won't receive it twice return false } +func isPartitionTableEvent(ddlEvent *PersistedDDLEvent) bool { + // ddlEvent.TableInfo may only be nil in unit test + return ddlEvent.TableInfo != nil && ddlEvent.TableInfo.Partition != nil +} + +func getAllPartitionIDs(ddlEvent *PersistedDDLEvent) []int64 { + physicalIDs := make([]int64, 0, len(ddlEvent.TableInfo.Partition.Definitions)) + for _, partition := range ddlEvent.TableInfo.Partition.Definitions { + physicalIDs = append(physicalIDs, partition.ID) + } + return physicalIDs +} + func updateDDLHistory( ddlEvent *PersistedDDLEvent, databaseMap map[int64]*BasicDatabaseInfo, @@ -750,45 +836,109 @@ func updateDDLHistory( tablesDDLHistory map[int64][]uint64, tableTriggerDDLHistory []uint64, ) ([]uint64, error) { - addTableHistory := func(tableID int64) { + appendTableHistory := func(tableID int64) { tablesDDLHistory[tableID] = append(tablesDDLHistory[tableID], ddlEvent.FinishedTs) } + appendPartitionsHistory := func(partitionIDs []int64) { + for _, partitionID := range partitionIDs { + tablesDDLHistory[partitionID] = append(tablesDDLHistory[partitionID], ddlEvent.FinishedTs) + } + } switch model.ActionType(ddlEvent.Type) { - case model.ActionCreateSchema, - model.ActionCreateView: + case model.ActionCreateSchema: tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) - for tableID := range tableMap { - addTableHistory(tableID) - } case model.ActionDropSchema: tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) for tableID := range databaseMap[ddlEvent.CurrentSchemaID].Tables { - addTableHistory(tableID) + appendTableHistory(tableID) } case model.ActionCreateTable, model.ActionDropTable: tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) - addTableHistory(ddlEvent.CurrentTableID) + // Note: for create table, this ddl event will not be sent to table dispatchers. + // add it to ddl history is just for building table info store. + if isPartitionTableEvent(ddlEvent) { + // for partition table, we only care the ddl history of physical table ids. + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + } else { + appendTableHistory(ddlEvent.CurrentTableID) + } case model.ActionAddColumn, model.ActionDropColumn, model.ActionAddIndex, model.ActionDropIndex, model.ActionAddForeignKey, - model.ActionDropForeignKey, - model.ActionModifyColumn, - model.ActionRebaseAutoID, - model.ActionSetDefaultValue, + model.ActionDropForeignKey: + if isPartitionTableEvent(ddlEvent) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + } else { + appendTableHistory(ddlEvent.CurrentTableID) + } + case model.ActionTruncateTable: + if isPartitionTableEvent(ddlEvent) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + appendPartitionsHistory(ddlEvent.PrevPartitions) + } else { + appendTableHistory(ddlEvent.CurrentTableID) + appendTableHistory(ddlEvent.PrevTableID) + } + case model.ActionModifyColumn, + model.ActionRebaseAutoID: + if isPartitionTableEvent(ddlEvent) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + } else { + appendTableHistory(ddlEvent.CurrentTableID) + } + case model.ActionRenameTable: + tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) + if isPartitionTableEvent(ddlEvent) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + } else { + appendTableHistory(ddlEvent.CurrentTableID) + } + case model.ActionSetDefaultValue, model.ActionShardRowID, model.ActionModifyTableComment, model.ActionRenameIndex: - addTableHistory(ddlEvent.CurrentTableID) - case model.ActionTruncateTable: - addTableHistory(ddlEvent.CurrentTableID) - addTableHistory(ddlEvent.PrevTableID) - case model.ActionRenameTable: + if isPartitionTableEvent(ddlEvent) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + } else { + appendTableHistory(ddlEvent.CurrentTableID) + } + case model.ActionAddTablePartition: + // all partitions include newly create partitions will receive this event + appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + case model.ActionDropTablePartition: + // TODO: verify all partitions include dropped partitions will receive this event + appendPartitionsHistory(ddlEvent.PrevPartitions) + case model.ActionCreateView: + tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) + for tableID := range tableMap { + appendTableHistory(tableID) + } + case model.ActionTruncateTablePartition: + appendPartitionsHistory(ddlEvent.PrevPartitions) + newCreateIDs := getCreatedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent)) + appendPartitionsHistory(newCreateIDs) + case model.ActionExchangeTablePartition: + droppedIDs := getDroppedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent)) + if len(droppedIDs) != 1 { + log.Panic("exchange table partition should only drop one partition", + zap.Int64s("droppedIDs", droppedIDs)) + } + appendTableHistory(ddlEvent.PrevTableID) + appendPartitionsHistory(droppedIDs) + case model.ActionCreateTables: tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) - addTableHistory(ddlEvent.CurrentTableID) + // it won't be send to table dispatchers, just for build version store + for _, info := range ddlEvent.MultipleTableInfos { + appendTableHistory(info.ID) + } + case model.ActionReorganizePartition: + appendPartitionsHistory(ddlEvent.PrevPartitions) + newCreateIDs := getCreatedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent)) + appendPartitionsHistory(newCreateIDs) default: log.Panic("unknown ddl type", zap.Any("ddlType", ddlEvent.Type), @@ -802,6 +952,7 @@ func updateDatabaseInfoAndTableInfo( event *PersistedDDLEvent, databaseMap map[int64]*BasicDatabaseInfo, tableMap map[int64]*BasicTableInfo, + partitionMap map[int64]BasicPartitionInfo, ) error { addTableToDB := func(schemaID int64, tableID int64) { databaseInfo, ok := databaseMap[schemaID] @@ -853,24 +1004,45 @@ func updateDatabaseInfoAndTableInfo( case model.ActionDropSchema: for tableID := range databaseMap[event.CurrentSchemaID].Tables { delete(tableMap, tableID) + // TODO: test it + delete(partitionMap, tableID) } delete(databaseMap, event.CurrentSchemaID) case model.ActionCreateTable: createTable(event.CurrentSchemaID, event.CurrentTableID) + if isPartitionTableEvent(event) { + partitionInfo := make(BasicPartitionInfo) + for _, partition := range event.TableInfo.Partition.Definitions { + partitionInfo[partition.ID] = nil + } + partitionMap[event.CurrentTableID] = partitionInfo + } case model.ActionDropTable: dropTable(event.CurrentSchemaID, event.CurrentTableID) + if isPartitionTableEvent(event) { + delete(partitionMap, event.CurrentTableID) + } case model.ActionAddColumn, model.ActionDropColumn, model.ActionAddIndex, model.ActionDropIndex, model.ActionAddForeignKey, - model.ActionDropForeignKey, - model.ActionModifyColumn, - model.ActionRebaseAutoID: + model.ActionDropForeignKey: // ignore case model.ActionTruncateTable: dropTable(event.CurrentSchemaID, event.PrevTableID) createTable(event.CurrentSchemaID, event.CurrentTableID) + if isPartitionTableEvent(event) { + delete(partitionMap, event.PrevTableID) + partitionInfo := make(BasicPartitionInfo) + for _, partition := range event.TableInfo.Partition.Definitions { + partitionInfo[partition.ID] = nil + } + partitionMap[event.CurrentTableID] = partitionInfo + } + case model.ActionModifyColumn, + model.ActionRebaseAutoID: + // ignore case model.ActionRenameTable: if event.PrevSchemaID != event.CurrentSchemaID { tableMap[event.CurrentTableID].SchemaID = event.CurrentSchemaID @@ -881,12 +1053,60 @@ func updateDatabaseInfoAndTableInfo( case model.ActionSetDefaultValue, model.ActionShardRowID, model.ActionModifyTableComment, - model.ActionRenameIndex, - model.ActionCreateView: - // TODO - // seems can be ignored + model.ActionRenameIndex: + // TODO: verify can be ignored case model.ActionAddTablePartition: - // TODO + newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + for _, id := range newCreatedIDs { + partitionMap[event.CurrentTableID][id] = nil + } + case model.ActionDropTablePartition: + droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + for _, id := range droppedIDs { + delete(partitionMap[event.CurrentTableID], id) + } + case model.ActionCreateView: + // ignore + case model.ActionTruncateTablePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + for _, id := range droppedIDs { + delete(partitionMap[event.CurrentTableID], id) + } + newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) + for _, id := range newCreatedIDs { + partitionMap[event.CurrentTableID][id] = nil + } + case model.ActionExchangeTablePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + if len(droppedIDs) != 1 { + log.Panic("exchange table partition should only drop one partition", + zap.Int64s("droppedIDs", droppedIDs)) + } + targetPartitionID := droppedIDs[0] + dropTable(event.PrevSchemaID, event.PrevTableID) + createTable(event.PrevSchemaID, targetPartitionID) + delete(partitionMap[event.CurrentTableID], targetPartitionID) + partitionMap[event.CurrentTableID][event.PrevTableID] = nil + case model.ActionCreateTables: + for _, info := range event.MultipleTableInfos { + addTableToDB(event.CurrentSchemaID, info.ID) + tableMap[info.ID] = &BasicTableInfo{ + SchemaID: event.CurrentSchemaID, + Name: info.Name.O, + } + } + case model.ActionReorganizePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + for _, id := range droppedIDs { + delete(partitionMap[event.CurrentTableID], id) + } + newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) + for _, id := range newCreatedIDs { + partitionMap[event.CurrentTableID][id] = nil + } default: log.Panic("unknown ddl type", zap.Any("ddlType", event.Type), @@ -897,38 +1117,154 @@ func updateDatabaseInfoAndTableInfo( } func updateRegisteredTableInfoStore( - event PersistedDDLEvent, + event *PersistedDDLEvent, tableInfoStoreMap map[int64]*versionedTableInfoStore, ) error { + tryApplyDDLToStore := func() { + if isPartitionTableEvent(event) { + allPhysicalIDs := getAllPartitionIDs(event) + for _, id := range allPhysicalIDs { + if store, ok := tableInfoStoreMap[id]; ok { + store.applyDDL(event) + } + } + } else { + if store, ok := tableInfoStoreMap[event.CurrentTableID]; ok { + store.applyDDL(event) + } + } + } + switch model.ActionType(event.Type) { case model.ActionCreateSchema, - model.ActionDropSchema, - model.ActionCreateTable, - model.ActionAddIndex, - model.ActionDropIndex, - model.ActionAddForeignKey, - model.ActionDropForeignKey, - model.ActionRenameTable, - model.ActionCreateView: + model.ActionDropSchema: // ignore + case model.ActionCreateTable: + if isPartitionTableEvent(event) { + allPhysicalIDs := getAllPartitionIDs(event) + for _, id := range allPhysicalIDs { + if _, ok := tableInfoStoreMap[event.CurrentTableID]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", id)) + } + } + } else { + if _, ok := tableInfoStoreMap[event.CurrentTableID]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", event.CurrentTableID)) + } + } case model.ActionDropTable, model.ActionAddColumn, - model.ActionDropColumn, - model.ActionModifyColumn, - model.ActionRebaseAutoID, - model.ActionSetDefaultValue, - model.ActionShardRowID, + model.ActionDropColumn: + tryApplyDDLToStore() + case model.ActionAddIndex, + model.ActionDropIndex, + model.ActionAddForeignKey, + model.ActionDropForeignKey: + // ignore + case model.ActionTruncateTable: + if isPartitionTableEvent(event) { + for _, id := range event.PrevPartitions { + if store, ok := tableInfoStoreMap[id]; ok { + store.applyDDL(event) + } + } + allPhysicalIDs := getAllPartitionIDs(event) + for _, id := range allPhysicalIDs { + if _, ok := tableInfoStoreMap[id]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", event.CurrentTableID)) + } + } + } else { + if store, ok := tableInfoStoreMap[event.PrevTableID]; ok { + store.applyDDL(event) + } + if _, ok := tableInfoStoreMap[event.CurrentTableID]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", event.CurrentTableID)) + } + } + case model.ActionModifyColumn: + tryApplyDDLToStore() + case model.ActionRebaseAutoID: + // TODO: verify can be ignored + case model.ActionRenameTable: + // ignore + case model.ActionSetDefaultValue: + tryApplyDDLToStore() + case model.ActionShardRowID, model.ActionModifyTableComment, model.ActionRenameIndex: - if store, ok := tableInfoStoreMap[event.CurrentTableID]; ok { + // TODO: verify can be ignored + case model.ActionAddTablePartition: + newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + for _, id := range newCreatedIDs { + if _, ok := tableInfoStoreMap[id]; ok { + log.Panic("newly created partitions should not be registered", + zap.Int64("partitionID", id)) + } + } + case model.ActionDropTablePartition: + droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + for _, id := range droppedIDs { + if store, ok := tableInfoStoreMap[id]; ok { + store.applyDDL(event) + } + } + case model.ActionCreateView: + // ignore + case model.ActionTruncateTablePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + for _, id := range droppedIDs { + if store, ok := tableInfoStoreMap[id]; ok { + store.applyDDL(event) + } + } + newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) + for _, id := range newCreatedIDs { + if _, ok := tableInfoStoreMap[id]; ok { + log.Panic("newly created partitions should not be registered", + zap.Int64("partitionID", id)) + } + } + case model.ActionExchangeTablePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + if len(droppedIDs) != 1 { + log.Panic("exchange table partition should only drop one partition", + zap.Int64s("droppedIDs", droppedIDs)) + } + targetPartitionID := droppedIDs[0] + if store, ok := tableInfoStoreMap[targetPartitionID]; ok { store.applyDDL(event) } - case model.ActionTruncateTable: if store, ok := tableInfoStoreMap[event.PrevTableID]; ok { store.applyDDL(event) } - if store, ok := tableInfoStoreMap[event.CurrentTableID]; ok { - store.applyDDL(event) + case model.ActionCreateTables: + for _, info := range event.MultipleTableInfos { + if _, ok := tableInfoStoreMap[info.ID]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", info.ID)) + } + } + case model.ActionReorganizePartition: + physicalIDs := getAllPartitionIDs(event) + droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) + for _, id := range droppedIDs { + if store, ok := tableInfoStoreMap[id]; ok { + store.applyDDL(event) + } + } + newCreatedIDs := getCreatedIDs(event.PrevPartitions, physicalIDs) + for _, id := range newCreatedIDs { + if _, ok := tableInfoStoreMap[id]; ok { + log.Panic("newly created partitions should not be registered", + zap.Int64("partitionID", id)) + } } default: log.Panic("unknown ddl type", @@ -938,35 +1274,40 @@ func updateRegisteredTableInfoStore( return nil } +func getCreatedIDs(oldIDs []int64, newIDs []int64) []int64 { + oldIDsMap := make(map[int64]interface{}, len(oldIDs)) + for _, id := range oldIDs { + oldIDsMap[id] = nil + } + createdIDs := make([]int64, 0) + for _, id := range newIDs { + if _, ok := oldIDsMap[id]; !ok { + createdIDs = append(createdIDs, id) + } + } + return createdIDs +} +func getDroppedIDs(oldIDs []int64, newIDs []int64) []int64 { + return getCreatedIDs(newIDs, oldIDs) +} + func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) common.DDLEvent { ddlEvent := common.DDLEvent{ Type: rawEvent.Type, - // TODO: whether the following fields are needed + // TODO: whether the following four fields are needed SchemaID: rawEvent.CurrentSchemaID, TableID: rawEvent.CurrentTableID, SchemaName: rawEvent.CurrentSchemaName, TableName: rawEvent.CurrentTableName, + Query: rawEvent.Query, TableInfo: rawEvent.TableInfo, FinishedTs: rawEvent.FinishedTs, TiDBOnly: false, } - // TODO: remove schema id when influcence type is normal - // TODO: respect filter for create table / drop table and more ddls + switch model.ActionType(rawEvent.Type) { - case model.ActionCreateSchema, - model.ActionAddColumn, - model.ActionDropColumn, - model.ActionAddIndex, - model.ActionDropIndex, - model.ActionAddForeignKey, - model.ActionDropForeignKey, - model.ActionModifyColumn, - model.ActionRebaseAutoID, - model.ActionSetDefaultValue, - model.ActionShardRowID, - model.ActionModifyTableComment, - model.ActionRenameIndex: + case model.ActionCreateSchema: // ignore case model.ActionDropSchema: ddlEvent.NeedDroppedTables = &common.InfluencedTables{ @@ -977,12 +1318,22 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo DropDatabaseName: rawEvent.CurrentSchemaName, } case model.ActionCreateTable: - // TODO: support create partition table - ddlEvent.NeedAddedTables = []common.Table{ - { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, - }, + if isPartitionTableEvent(rawEvent) { + physicalIDs := getAllPartitionIDs(rawEvent) + ddlEvent.NeedAddedTables = make([]common.Table, 0, len(physicalIDs)) + for _, id := range physicalIDs { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: id, + }) + } + } else { + ddlEvent.NeedAddedTables = []common.Table{ + { + SchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.CurrentTableID, + }, + } } ddlEvent.TableNameChange = &common.TableNameChange{ AddName: []common.SchemaTableName{ @@ -993,15 +1344,28 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo }, } case model.ActionDropTable: - ddlEvent.BlockedTables = &common.InfluencedTables{ - InfluenceType: common.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID, heartbeatpb.DDLSpan.TableID}, - SchemaID: rawEvent.CurrentSchemaID, - } - ddlEvent.NeedDroppedTables = &common.InfluencedTables{ - InfluenceType: common.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID}, - SchemaID: rawEvent.CurrentSchemaID, + if isPartitionTableEvent(rawEvent) { + allPhysicalTableIDs := getAllPartitionIDs(rawEvent) + allPhysicalTableIDsAndDDLSpanID := make([]int64, 0, len(rawEvent.TableInfo.Partition.Definitions)+1) + allPhysicalTableIDsAndDDLSpanID = append(allPhysicalTableIDsAndDDLSpanID, allPhysicalTableIDs...) + allPhysicalTableIDsAndDDLSpanID = append(allPhysicalTableIDsAndDDLSpanID, heartbeatpb.DDLSpan.TableID) + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: allPhysicalTableIDsAndDDLSpanID, + } + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: allPhysicalTableIDs, + } + } else { + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.CurrentTableID, heartbeatpb.DDLSpan.TableID}, + } + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.CurrentTableID}, + } } ddlEvent.TableNameChange = &common.TableNameChange{ DropName: []common.SchemaTableName{ @@ -1011,63 +1375,340 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo }, }, } + case model.ActionAddColumn, + model.ActionDropColumn, + model.ActionAddIndex, + model.ActionDropIndex, + model.ActionAddForeignKey, + model.ActionDropForeignKey: + // ignore case model.ActionTruncateTable: - ddlEvent.NeedDroppedTables = &common.InfluencedTables{ - InfluenceType: common.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.PrevTableID}, - SchemaID: rawEvent.CurrentSchemaID, - } - ddlEvent.NeedAddedTables = []common.Table{ - { - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, - }, + if isPartitionTableEvent(rawEvent) { + if len(rawEvent.PrevPartitions) > 1 { + // if more than one partitions, we need block them + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: rawEvent.PrevPartitions, + } + } + // Note: for truncate table, prev partitions must all be dropped. + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: rawEvent.PrevPartitions, + } + physicalIDs := getAllPartitionIDs(rawEvent) + ddlEvent.NeedAddedTables = make([]common.Table, 0, len(physicalIDs)) + for _, id := range physicalIDs { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: id, + }) + } + } else { + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.PrevTableID}, + } + ddlEvent.NeedAddedTables = []common.Table{ + { + SchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.CurrentTableID, + }, + } } + case model.ActionModifyColumn, + model.ActionRebaseAutoID: + // ignore case model.ActionRenameTable: ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName) ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) - var addName, dropName []common.SchemaTableName - if !ignorePrevTable { + if isPartitionTableEvent(rawEvent) { + allPhysicalIDs := getAllPartitionIDs(rawEvent) + if !ignorePrevTable { + allPhysicalIDsAndDDLSpanID := make([]int64, 0, len(allPhysicalIDs)+1) + allPhysicalIDsAndDDLSpanID = append(allPhysicalIDsAndDDLSpanID, allPhysicalIDs...) + allPhysicalIDsAndDDLSpanID = append(allPhysicalIDsAndDDLSpanID, heartbeatpb.DDLSpan.TableID) + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: allPhysicalIDsAndDDLSpanID, + } + if !ignoreCurrentTable { + // check whether schema change + if rawEvent.PrevSchemaID != rawEvent.CurrentSchemaID { + ddlEvent.UpdatedSchemas = make([]common.SchemaIDChange, 0, len(allPhysicalIDs)) + for _, id := range allPhysicalIDs { + ddlEvent.UpdatedSchemas = append(ddlEvent.UpdatedSchemas, common.SchemaIDChange{ + TableID: id, + OldSchemaID: rawEvent.PrevSchemaID, + NewSchemaID: rawEvent.CurrentSchemaID, + }) + } + } + } else { + // the table is filtered out after rename table, we need drop the table + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: allPhysicalIDs, + } + ddlEvent.TableNameChange = &common.TableNameChange{ + DropName: []common.SchemaTableName{ + { + SchemaName: rawEvent.PrevSchemaName, + TableName: rawEvent.PrevTableName, + }, + }, + } + } + } else if !ignoreCurrentTable { + // the table is filtered out before rename table, we need add table here + ddlEvent.NeedAddedTables = []common.Table{ + { + SchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.CurrentTableID, + }, + } + ddlEvent.NeedAddedTables = make([]common.Table, 0, len(allPhysicalIDs)) + for _, id := range allPhysicalIDs { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: id, + }) + } + ddlEvent.TableNameChange = &common.TableNameChange{ + AddName: []common.SchemaTableName{ + { + SchemaName: rawEvent.CurrentSchemaName, + TableName: rawEvent.CurrentTableName, + }, + }, + } + } else { + // if the table is both filtered out before and after rename table, the ddl should not be fetched + log.Panic("should not build a ignored rename table ddl", + zap.String("DDL", rawEvent.Query), + zap.Int64("jobID", rawEvent.ID), + zap.Int64("schemaID", rawEvent.CurrentSchemaID), + zap.Int64("tableID", rawEvent.CurrentTableID)) + } + } else { + if !ignorePrevTable { + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.CurrentTableID, heartbeatpb.DDLSpan.TableID}, + } + if !ignoreCurrentTable { + if rawEvent.PrevSchemaID != rawEvent.CurrentSchemaID { + ddlEvent.UpdatedSchemas = []common.SchemaIDChange{ + { + TableID: rawEvent.CurrentTableID, + OldSchemaID: rawEvent.PrevSchemaID, + NewSchemaID: rawEvent.CurrentSchemaID, + }, + } + } + } else { + // the table is filtered out after rename table, we need drop the table + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.CurrentTableID}, + } + ddlEvent.TableNameChange = &common.TableNameChange{ + DropName: []common.SchemaTableName{ + { + SchemaName: rawEvent.PrevSchemaName, + TableName: rawEvent.PrevTableName, + }, + }, + } + } + } else if !ignoreCurrentTable { + // the table is filtered out before rename table, we need add table here + ddlEvent.NeedAddedTables = []common.Table{ + { + SchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.CurrentTableID, + }, + } + ddlEvent.TableNameChange = &common.TableNameChange{ + AddName: []common.SchemaTableName{ + { + SchemaName: rawEvent.CurrentSchemaName, + TableName: rawEvent.CurrentTableName, + }, + }, + } + } else { + // if the table is both filtered out before and after rename table, the ddl should not be fetched + log.Panic("should not build a ignored rename table ddl", + zap.String("DDL", rawEvent.Query), + zap.Int64("jobID", rawEvent.ID), + zap.Int64("schemaID", rawEvent.CurrentSchemaID), + zap.Int64("tableID", rawEvent.CurrentTableID)) + } + } + case model.ActionSetDefaultValue, + model.ActionShardRowID, + model.ActionModifyTableComment, + model.ActionRenameIndex: + // ignore + case model.ActionAddTablePartition: + if len(rawEvent.PrevPartitions) > 1 { ddlEvent.BlockedTables = &common.InfluencedTables{ InfluenceType: common.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID, heartbeatpb.DDLSpan.TableID}, - SchemaID: rawEvent.PrevSchemaID, + TableIDs: rawEvent.PrevPartitions, } - ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + } + physicalIDs := getAllPartitionIDs(rawEvent) + newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) + ddlEvent.NeedAddedTables = make([]common.Table, 0, len(newCreatedIDs)) + for _, id := range newCreatedIDs { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: id, + }) + } + case model.ActionDropTablePartition: + if len(rawEvent.PrevPartitions) > 1 { + ddlEvent.BlockedTables = &common.InfluencedTables{ InfluenceType: common.InfluenceTypeNormal, - TableIDs: []int64{rawEvent.CurrentTableID}, - SchemaID: rawEvent.PrevSchemaID, + TableIDs: rawEvent.PrevPartitions, + } + } + physicalIDs := getAllPartitionIDs(rawEvent) + droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: droppedIDs, + } + case model.ActionCreateView: + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeAll, + } + case model.ActionTruncateTablePartition: + if len(rawEvent.PrevPartitions) > 1 { + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: rawEvent.PrevPartitions, } - dropName = append(dropName, common.SchemaTableName{ - SchemaName: rawEvent.PrevSchemaName, - TableName: rawEvent.PrevTableName, + } + physicalIDs := getAllPartitionIDs(rawEvent) + newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) + for _, id := range newCreatedIDs { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: id, }) } - if !ignoreCurrentTable { + droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: droppedIDs, + } + case model.ActionExchangeTablePartition: + ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName) + ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) + physicalIDs := getAllPartitionIDs(rawEvent) + droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) + if len(droppedIDs) != 1 { + log.Panic("exchange table partition should only drop one partition", + zap.Int64s("droppedIDs", droppedIDs)) + } + targetPartitionID := droppedIDs[0] + if !ignoreNormalTable && !ignorePartitionTable { + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.PrevTableID, targetPartitionID}, + } + ddlEvent.UpdatedSchemas = []common.SchemaIDChange{ + { + TableID: targetPartitionID, + OldSchemaID: rawEvent.CurrentSchemaID, + NewSchemaID: rawEvent.PrevSchemaID, + }, + { + TableID: rawEvent.PrevTableID, + OldSchemaID: rawEvent.PrevSchemaID, + NewSchemaID: rawEvent.CurrentSchemaID, + }, + } + } else if !ignoreNormalTable { + // just one table, no need to block + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{rawEvent.PrevTableID}, + } + ddlEvent.NeedAddedTables = []common.Table{ + { + SchemaID: rawEvent.PrevSchemaID, + TableID: targetPartitionID, + }, + } + } else if !ignorePartitionTable { + // just one table, no need to block + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: []int64{targetPartitionID}, + } ddlEvent.NeedAddedTables = []common.Table{ { SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.CurrentTableID, + TableID: rawEvent.PrevTableID, }, } + } else { + log.Fatal("should not happen") + } + case model.ActionCreateTables: + ddlEvent.NeedAddedTables = make([]common.Table, 0, len(rawEvent.MultipleTableInfos)) + addName := make([]common.SchemaTableName, 0, len(rawEvent.MultipleTableInfos)) + querys := strings.Split(rawEvent.Query, ";") + resultQuerys := make([]string, 0, len(rawEvent.MultipleTableInfos)) + for i := range rawEvent.MultipleTableInfos { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.MultipleTableInfos[i].Name.O) { + continue + } + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: rawEvent.MultipleTableInfos[i].ID, + }) addName = append(addName, common.SchemaTableName{ SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.CurrentTableName, + TableName: rawEvent.MultipleTableInfos[i].Name.O, }) + resultQuerys = append(resultQuerys, querys[i]) } ddlEvent.TableNameChange = &common.TableNameChange{ - AddName: addName, - DropName: dropName, + AddName: addName, } - case model.ActionCreateView: - ddlEvent.BlockedTables = &common.InfluencedTables{ - InfluenceType: common.InfluenceTypeAll, + ddlEvent.Query = strings.Join(resultQuerys, ";") + if len(ddlEvent.NeedAddedTables) == 0 { + log.Fatal("should not happen") + } + case model.ActionReorganizePartition: + // same as truncate partition + if len(rawEvent.PrevPartitions) > 1 { + ddlEvent.BlockedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: rawEvent.PrevPartitions, + } + } + physicalIDs := getAllPartitionIDs(rawEvent) + newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) + for _, id := range newCreatedIDs { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: id, + }) + } + droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) + ddlEvent.NeedDroppedTables = &common.InfluencedTables{ + InfluenceType: common.InfluenceTypeNormal, + TableIDs: droppedIDs, } default: log.Panic("unknown ddl type", zap.Any("ddlType", rawEvent.Type), zap.String("DDL", rawEvent.Query)) } - return ddlEvent } diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index 02a332cd5..bfed5655c 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -48,6 +48,7 @@ func loadPersistentStorageForTest(db *pebble.DB, gcTs uint64, upperBound UpperBo return p } +// create an empty persistent storage at dbPath func newEmptyPersistentStorageForTest(dbPath string) *persistentStorage { db, err := pebble.Open(dbPath, &pebble.Options{}) if err != nil { @@ -62,6 +63,7 @@ func newEmptyPersistentStorageForTest(dbPath string) *persistentStorage { return loadPersistentStorageForTest(db, gcTs, upperBound) } +// create a persistent storage with initial db info and table info func newPersistentStorageForTest(dbPath string, gcTs uint64, initialDBInfos map[int64]*model.DBInfo) *persistentStorage { db, err := pebble.Open(dbPath, &pebble.Options{}) if err != nil { @@ -246,7 +248,7 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { require.Equal(t, "t2", tableInfo2.Name.O) renameVersion2 := uint64(3000) - store.applyDDL(PersistedDDLEvent{ + store.applyDDL(&PersistedDDLEvent{ Type: byte(model.ActionRenameTable), CurrentSchemaID: schemaID, CurrentTableID: tableID, @@ -311,6 +313,123 @@ func TestBuildVersionedTableInfoStore(t *testing.T) { } } +func TestBuildVersionedTableInfoStoreWithPartitionTable(t *testing.T) { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + err := os.RemoveAll(dbPath) + require.Nil(t, err) + + gcTs := uint64(1000) + schemaID := int64(50) + tableID := int64(99) + databaseInfo := make(map[int64]*model.DBInfo) + databaseInfo[schemaID] = &model.DBInfo{ + ID: schemaID, + Name: model.NewCIStr("test"), + } + pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) + + // create a partition table + partitionID1 := tableID + 100 + partitionID2 := tableID + 200 + { + job := &model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 2000, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + }, + }, + }, + FinishedTS: 1200, + }, + } + pStorage.handleDDLJob(job) + } + + upperBound := UpperBoundMeta{ + FinishedDDLTs: 3000, + SchemaVersion: 4000, + ResolvedTs: 2000, + } + pStorage = loadPersistentStorageForTest(pStorage.db, gcTs, upperBound) + { + store := newEmptyVersionedTableInfoStore(partitionID1) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + } + { + store := newEmptyVersionedTableInfoStore(partitionID2) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + } + + // truncate the partition table + tableID2 := tableID + 500 + partitionID3 := tableID2 + 100 + partitionID4 := tableID2 + 200 + { + job := &model.Job{ + Type: model.ActionTruncateTable, + SchemaID: schemaID, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 2100, + TableInfo: &model.TableInfo{ + ID: tableID2, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID3, + }, + { + ID: partitionID4, + }, + }, + }, + }, + FinishedTS: 1300, + }, + } + pStorage.handleDDLJob(job) + } + + { + store := newEmptyVersionedTableInfoStore(partitionID1) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, uint64(1300), store.deleteVersion) + } + { + store := newEmptyVersionedTableInfoStore(partitionID2) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, uint64(1300), store.deleteVersion) + } + { + store := newEmptyVersionedTableInfoStore(partitionID3) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + } + { + store := newEmptyVersionedTableInfoStore(partitionID4) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + } +} + func TestHandleCreateDropSchemaTableDDL(t *testing.T) { dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) err := os.RemoveAll(dbPath) @@ -460,18 +579,659 @@ func TestHandleCreateDropSchemaTableDDL(t *testing.T) { }, } - pStorage.handleDDLJob(job) + pStorage.handleDDLJob(job) + + require.Equal(t, 0, len(pStorage.databaseMap)) + require.Equal(t, 0, len(pStorage.tableMap)) + require.Equal(t, 5, len(pStorage.tableTriggerDDLHistory)) + require.Equal(t, uint64(300), pStorage.tableTriggerDDLHistory[4]) + require.Equal(t, 3, len(pStorage.tablesDDLHistory)) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[tableID])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[tableID2])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[tableID3])) + require.Equal(t, uint64(300), pStorage.tablesDDLHistory[tableID3][1]) + } +} + +func TestPartitionTableDDL(t *testing.T) { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + err := os.RemoveAll(dbPath) + require.Nil(t, err) + + gcTs := uint64(100) + schemaID := int64(300) + databaseInfo := make(map[int64]*model.DBInfo) + databaseInfo[schemaID] = &model.DBInfo{ + ID: schemaID, + Name: model.NewCIStr("test"), + } + pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) + + // create a partition table + tableID := int64(100) + partitionID1 := tableID + 100 + partitionID2 := tableID + 200 + partitionID3 := tableID + 300 + { + job := &model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 101, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: partitionID3, + }, + }, + }, + }, + FinishedTS: 201, + }, + } + pStorage.handleDDLJob(job) + + require.Equal(t, 1, len(pStorage.databaseMap[schemaID].Tables)) + require.Equal(t, 1, len(pStorage.tableMap)) + require.Equal(t, 3, len(pStorage.partitionMap[tableID])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID1])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID2])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID3])) + } + + // truncate the partition table + tableID2 := tableID + 400 + partitionID4 := tableID2 + 100 + partitionID5 := tableID2 + 200 + partitionID6 := tableID2 + 300 + { + job := &model.Job{ + Type: model.ActionTruncateTable, + SchemaID: schemaID, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 107, + TableInfo: &model.TableInfo{ + ID: tableID2, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID4, + }, + { + ID: partitionID5, + }, + { + ID: partitionID6, + }, + }, + }, + }, + FinishedTS: 207, + }, + } + pStorage.handleDDLJob(job) + + require.Equal(t, 1, len(pStorage.databaseMap[schemaID].Tables)) + require.Equal(t, 1, len(pStorage.tableMap)) + require.Equal(t, 1, len(pStorage.partitionMap)) + require.Equal(t, 3, len(pStorage.partitionMap[tableID2])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID1])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID2])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID3])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID4])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID5])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID6])) + } + + // add partition 7 + partitionID7 := partitionID6 + 100 + { + job := &model.Job{ + Type: model.ActionAddTablePartition, + SchemaID: schemaID, + TableID: tableID2, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 109, + TableInfo: &model.TableInfo{ + ID: tableID2, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID4, + }, + { + ID: partitionID5, + }, + { + ID: partitionID6, + }, + { + ID: partitionID7, + }, + }, + }, + }, + FinishedTS: 209, + }, + } + pStorage.handleDDLJob(job) + + require.Equal(t, 4, len(pStorage.partitionMap[tableID2])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID4])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID5])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID6])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID7])) + } + + // drop patition 4 + { + job := &model.Job{ + Type: model.ActionDropTablePartition, + SchemaID: schemaID, + TableID: tableID2, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 111, + TableInfo: &model.TableInfo{ + ID: tableID2, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID5, + }, + { + ID: partitionID6, + }, + { + ID: partitionID7, + }, + }, + }, + }, + FinishedTS: 211, + }, + } + pStorage.handleDDLJob(job) + + require.Equal(t, 3, len(pStorage.partitionMap[tableID2])) + require.Equal(t, 3, len(pStorage.tablesDDLHistory[partitionID4])) + require.Equal(t, 3, len(pStorage.tablesDDLHistory[partitionID5])) + require.Equal(t, 3, len(pStorage.tablesDDLHistory[partitionID6])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID7])) + } + + // truncate partition 5 -> 8 + partitionID8 := partitionID7 + 100 + { + job := &model.Job{ + Type: model.ActionTruncateTablePartition, + SchemaID: schemaID, + TableID: tableID2, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 113, + TableInfo: &model.TableInfo{ + ID: tableID2, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID6, + }, + { + ID: partitionID7, + }, + { + ID: partitionID8, + }, + }, + }, + }, + FinishedTS: 213, + }, + } + pStorage.handleDDLJob(job) + + require.Equal(t, 3, len(pStorage.partitionMap[tableID2])) + require.Equal(t, 3, len(pStorage.tablesDDLHistory[partitionID4])) + require.Equal(t, 4, len(pStorage.tablesDDLHistory[partitionID5])) + require.Equal(t, 4, len(pStorage.tablesDDLHistory[partitionID6])) + require.Equal(t, 3, len(pStorage.tablesDDLHistory[partitionID7])) + require.Equal(t, 1, len(pStorage.tablesDDLHistory[partitionID8])) + } + + // // TODO: test reorganize partition(becuase its logic is same as truncate partition, ignore it now, add test after verify truncate partition) + + // drop the partition table + { + job := &model.Job{ + Type: model.ActionDropTable, + SchemaID: schemaID, + TableID: tableID2, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 240, + TableInfo: &model.TableInfo{ + ID: tableID2, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID6, + }, + { + ID: partitionID7, + }, + { + ID: partitionID8, + }, + }, + }, + }, + FinishedTS: 300, + }, + } + pStorage.handleDDLJob(job) + + require.Equal(t, 0, len(pStorage.databaseMap[schemaID].Tables)) + require.Equal(t, 0, len(pStorage.tableMap)) + require.Equal(t, 0, len(pStorage.partitionMap)) + require.Equal(t, 3, len(pStorage.tablesDDLHistory[partitionID4])) + require.Equal(t, 4, len(pStorage.tablesDDLHistory[partitionID5])) + require.Equal(t, 5, len(pStorage.tablesDDLHistory[partitionID6])) + require.Equal(t, 4, len(pStorage.tablesDDLHistory[partitionID7])) + require.Equal(t, 2, len(pStorage.tablesDDLHistory[partitionID8])) + } + + { + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(nil, 200, 300) + require.Nil(t, err) + require.Equal(t, 2, len(ddlEvents)) + // create table event + verifyTableIsAdded(t, ddlEvents[0], partitionID1, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID2, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID3, schemaID) + + // drop table event + verifyTableIsBlocked(t, ddlEvents[1], partitionID6) + verifyTableIsBlocked(t, ddlEvents[1], partitionID7) + verifyTableIsBlocked(t, ddlEvents[1], partitionID8) + verifyTableIsDropped(t, ddlEvents[1], partitionID6) + verifyTableIsDropped(t, ddlEvents[1], partitionID7) + verifyTableIsDropped(t, ddlEvents[1], partitionID8) + } + + // partition 1, 2, 3 should be same + { + // don't fetch the create table events + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID1, nil, 201, 300) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + // truncate table event + verifyTableIsBlocked(t, ddlEvents[0], partitionID1) + verifyTableIsBlocked(t, ddlEvents[0], partitionID2) + verifyTableIsBlocked(t, ddlEvents[0], partitionID3) + verifyTableIsDropped(t, ddlEvents[0], partitionID1) + verifyTableIsDropped(t, ddlEvents[0], partitionID2) + verifyTableIsDropped(t, ddlEvents[0], partitionID3) + verifyTableIsAdded(t, ddlEvents[0], partitionID4, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID5, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID6, schemaID) + } + + // partition 4: test add partition/drop partition + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID4, nil, 207, 300) + require.Nil(t, err) + require.Equal(t, 2, len(ddlEvents)) + // add partition event for partition 7 + require.Equal(t, model.ActionAddTablePartition, model.ActionType(ddlEvents[0].Type)) + verifyTableIsBlocked(t, ddlEvents[0], partitionID4) + verifyTableIsBlocked(t, ddlEvents[0], partitionID5) + verifyTableIsBlocked(t, ddlEvents[0], partitionID6) + verifyTableIsAdded(t, ddlEvents[0], partitionID7, schemaID) + // drop partition event + require.Equal(t, model.ActionDropTablePartition, model.ActionType(ddlEvents[1].Type)) + verifyTableIsBlocked(t, ddlEvents[1], partitionID4) + verifyTableIsBlocked(t, ddlEvents[1], partitionID5) + verifyTableIsBlocked(t, ddlEvents[1], partitionID6) + verifyTableIsBlocked(t, ddlEvents[1], partitionID7) + verifyTableIsDropped(t, ddlEvents[1], partitionID4) + } + + // partition 5: test truncate partition + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID5, nil, 207, 300) + require.Nil(t, err) + require.Equal(t, 3, len(ddlEvents)) + // add partition event for partition 7 + require.Equal(t, model.ActionAddTablePartition, model.ActionType(ddlEvents[0].Type)) + // drop partition event for partition 4 + require.Equal(t, model.ActionDropTablePartition, model.ActionType(ddlEvents[1].Type)) + // truncate partition event + require.Equal(t, model.ActionTruncateTablePartition, model.ActionType(ddlEvents[2].Type)) + verifyTableIsBlocked(t, ddlEvents[2], partitionID5) + verifyTableIsBlocked(t, ddlEvents[2], partitionID6) + verifyTableIsBlocked(t, ddlEvents[2], partitionID7) + verifyTableIsDropped(t, ddlEvents[2], partitionID5) + verifyTableIsAdded(t, ddlEvents[2], partitionID8, schemaID) + } + + // partiont 6 should be same + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID6, nil, 207, 300) + require.Nil(t, err) + require.Equal(t, 4, len(ddlEvents)) + require.Equal(t, model.ActionAddTablePartition, model.ActionType(ddlEvents[0].Type)) + require.Equal(t, model.ActionDropTablePartition, model.ActionType(ddlEvents[1].Type)) + require.Equal(t, model.ActionTruncateTablePartition, model.ActionType(ddlEvents[2].Type)) + require.Equal(t, model.ActionDropTable, model.ActionType(ddlEvents[3].Type)) + } + + // partition 7 + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID7, nil, 209, 300) + require.Nil(t, err) + require.Equal(t, 3, len(ddlEvents)) + require.Equal(t, model.ActionDropTablePartition, model.ActionType(ddlEvents[0].Type)) + require.Equal(t, model.ActionTruncateTablePartition, model.ActionType(ddlEvents[1].Type)) + require.Equal(t, model.ActionDropTable, model.ActionType(ddlEvents[2].Type)) + } + + // partition 8 + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID8, nil, 209, 300) + require.Nil(t, err) + require.Equal(t, 2, len(ddlEvents)) + require.Equal(t, model.ActionTruncateTablePartition, model.ActionType(ddlEvents[0].Type)) + require.Equal(t, model.ActionDropTable, model.ActionType(ddlEvents[1].Type)) + } + + // test drop schema can clear partition info + { + pStorage.handleDDLJob(&model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID, + TableID: tableID + 1000, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 255, + TableInfo: &model.TableInfo{ + ID: tableID + 1000, + Name: model.NewCIStr("t100"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID4 + 500, + }, + { + ID: partitionID5 + 500, + }, + }, + }, + }, + FinishedTS: 355, + }, + }) + require.Less(t, 0, len(pStorage.partitionMap)) + + job := &model.Job{ + Type: model.ActionDropSchema, + SchemaID: schemaID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 260, + DBInfo: &model.DBInfo{ + ID: schemaID, + Name: model.NewCIStr("test"), + }, + TableInfo: nil, + FinishedTS: 360, + }, + } + pStorage.handleDDLJob(job) + require.Equal(t, 0, len(pStorage.databaseMap)) + require.Equal(t, 0, len(pStorage.partitionMap)) + } +} + +func TestExchangePartitionTable(t *testing.T) { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + err := os.RemoveAll(dbPath) + require.Nil(t, err) + + gcTs := uint64(100) + schemaID1 := int64(300) + schemaID2 := schemaID1 + 100 + databaseInfo := make(map[int64]*model.DBInfo) + databaseInfo[schemaID1] = &model.DBInfo{ + ID: schemaID1, + Name: model.NewCIStr("test"), + } + databaseInfo[schemaID2] = &model.DBInfo{ + ID: schemaID2, + Name: model.NewCIStr("test2"), + } + pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) + + // create a partition table + tableID := int64(100) + partitionID1 := tableID + 100 + partitionID2 := tableID + 200 + partitionID3 := tableID + 300 + { + job := &model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID1, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 101, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: partitionID3, + }, + }, + }, + }, + FinishedTS: 201, + }, + } + pStorage.handleDDLJob(job) + } + + // create a normal table + tableID2 := tableID + 2000 + { + job := &model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID2, + TableID: tableID2, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 103, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t2"), + }, + FinishedTS: 203, + }, + } + pStorage.handleDDLJob(job) + } + + { + require.Equal(t, 2, len(pStorage.tableMap)) + require.Equal(t, 3, len(pStorage.partitionMap[tableID])) + } + + // exchange table with partition 3 + { + job := &model.Job{ + Type: model.ActionExchangeTablePartition, + SchemaID: schemaID2, + TableID: tableID2, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 105, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: tableID2, + }, + }, + }, + }, + FinishedTS: 205, + }, + } + pStorage.handleDDLJob(job) + } + + { + // verify databaseMap is updated successfully + _, ok := pStorage.databaseMap[schemaID2].Tables[partitionID3] + require.True(t, ok) + _, ok = pStorage.databaseMap[schemaID2].Tables[tableID2] + require.False(t, ok) + // verify tableMap is updated successfully + require.Equal(t, 2, len(pStorage.tableMap)) + require.Equal(t, schemaID2, pStorage.tableMap[partitionID3].SchemaID) + _, ok = pStorage.tableMap[tableID2] + require.False(t, ok) + // verify partitionMap is updated successfully + require.Equal(t, 3, len(pStorage.partitionMap[tableID])) + _, ok = pStorage.partitionMap[tableID][tableID2] + require.True(t, ok) + _, ok = pStorage.partitionMap[tableID][partitionID3] + require.False(t, ok) + } + + // verify build table info store + { + store := newEmptyVersionedTableInfoStore(partitionID3) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 2, len(store.infos)) + require.Equal(t, "t", store.infos[0].info.Name.O) + require.Equal(t, "t2", store.infos[1].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(tableID2) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 2, len(store.infos)) + require.Equal(t, "t2", store.infos[0].info.Name.O) + require.Equal(t, "t", store.infos[1].info.Name.O) + } + + // verify ddl events are set correctly + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID3, nil, 201, 205) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + verifyTableIsBlocked(t, ddlEvents[0], partitionID3) + verifyTableIsBlocked(t, ddlEvents[0], tableID2) + + ddlEvents, err = pStorage.fetchTableDDLEvents(tableID2, nil, 203, 205) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + } + + // test filter: normal table is filtered out + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test.t"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID3, tableFilter, 201, 205) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + require.Nil(t, ddlEvents[0].BlockedTables) + + verifyTableIsDropped(t, ddlEvents[0], partitionID3) + + verifyTableIsAdded(t, ddlEvents[0], tableID2, schemaID1) + } + + // test filter: partition table is filtered out + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test2.t2"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableDDLEvents(tableID2, tableFilter, 203, 205) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + require.Nil(t, ddlEvents[0].BlockedTables) + + verifyTableIsDropped(t, ddlEvents[0], tableID2) + + verifyTableIsAdded(t, ddlEvents[0], partitionID3, schemaID2) + } +} + +func TestAlterBetweenPartitionTableAndNonPartitionTable(t *testing.T) { + +} + +func verifyTableIsBlocked(t *testing.T, event common.DDLEvent, tableID int64) { + require.Equal(t, common.InfluenceTypeNormal, event.BlockedTables.InfluenceType) + for _, id := range event.BlockedTables.TableIDs { + if id == tableID { + return + } + } + log.Info("blocked tables", + zap.Any("type", event.Type), + zap.Any("blocked tables", event.BlockedTables), + zap.Int64("tableID", tableID)) + require.True(t, false) +} + +func verifyTableIsDropped(t *testing.T, event common.DDLEvent, tableID int64) { + require.Equal(t, common.InfluenceTypeNormal, event.NeedDroppedTables.InfluenceType) + for _, id := range event.NeedDroppedTables.TableIDs { + if id == tableID { + return + } + } + require.True(t, false) +} - require.Equal(t, 0, len(pStorage.databaseMap)) - require.Equal(t, 0, len(pStorage.tableMap)) - require.Equal(t, 5, len(pStorage.tableTriggerDDLHistory)) - require.Equal(t, uint64(300), pStorage.tableTriggerDDLHistory[4]) - require.Equal(t, 3, len(pStorage.tablesDDLHistory)) - require.Equal(t, 2, len(pStorage.tablesDDLHistory[tableID])) - require.Equal(t, 2, len(pStorage.tablesDDLHistory[tableID2])) - require.Equal(t, 2, len(pStorage.tablesDDLHistory[tableID3])) - require.Equal(t, uint64(300), pStorage.tablesDDLHistory[tableID3][1]) +func verifyTableIsAdded(t *testing.T, event common.DDLEvent, tableID int64, schemaID int64) { + for _, table := range event.NeedAddedTables { + if table.TableID == tableID && table.SchemaID == schemaID { + return + } } + require.True(t, false) } func TestHandleRenameTable(t *testing.T) { @@ -547,21 +1307,12 @@ func TestHandleRenameTable(t *testing.T) { require.Equal(t, 1, len(ddlEvents)) // rename table event require.Equal(t, uint64(605), ddlEvents[0].FinishedTs) - require.Equal(t, "test2", ddlEvents[0].SchemaName) - require.Equal(t, "t2", ddlEvents[0].TableName) - require.Equal(t, common.InfluenceTypeNormal, ddlEvents[0].BlockedTables.InfluenceType) - require.Equal(t, schemaID1, ddlEvents[0].BlockedTables.SchemaID) - require.Equal(t, tableID, ddlEvents[0].BlockedTables.TableIDs[0]) - // TODO: don't count on the order - require.Equal(t, heartbeatpb.DDLSpan.TableID, ddlEvents[0].BlockedTables.TableIDs[1]) - require.Equal(t, tableID, ddlEvents[0].NeedDroppedTables.TableIDs[0]) + verifyTableIsBlocked(t, ddlEvents[0], tableID) + verifyTableIsBlocked(t, ddlEvents[0], heartbeatpb.DDLSpan.TableID) - require.Equal(t, tableID, ddlEvents[0].NeedAddedTables[0].TableID) - - require.Equal(t, "test2", ddlEvents[0].TableNameChange.AddName[0].SchemaName) - require.Equal(t, "t2", ddlEvents[0].TableNameChange.AddName[0].TableName) - require.Equal(t, "test", ddlEvents[0].TableNameChange.DropName[0].SchemaName) - require.Equal(t, "t1", ddlEvents[0].TableNameChange.DropName[0].TableName) + require.Equal(t, tableID, ddlEvents[0].UpdatedSchemas[0].TableID) + require.Equal(t, schemaID1, ddlEvents[0].UpdatedSchemas[0].OldSchemaID) + require.Equal(t, schemaID2, ddlEvents[0].UpdatedSchemas[0].NewSchemaID) } // test filter: after rename, the table is filtered out @@ -574,12 +1325,10 @@ func TestHandleRenameTable(t *testing.T) { ddlEvents, err := pStorage.fetchTableDDLEvents(tableID, tableFilter, 601, 700) require.Nil(t, err) require.Equal(t, 1, len(ddlEvents)) - require.Equal(t, common.InfluenceTypeNormal, ddlEvents[0].BlockedTables.InfluenceType) - require.Equal(t, schemaID1, ddlEvents[0].BlockedTables.SchemaID) - require.Equal(t, tableID, ddlEvents[0].BlockedTables.TableIDs[0]) - // TODO: don't count on the order - require.Equal(t, heartbeatpb.DDLSpan.TableID, ddlEvents[0].BlockedTables.TableIDs[1]) - require.Equal(t, tableID, ddlEvents[0].NeedDroppedTables.TableIDs[0]) + verifyTableIsBlocked(t, ddlEvents[0], tableID) + verifyTableIsBlocked(t, ddlEvents[0], heartbeatpb.DDLSpan.TableID) + + verifyTableIsDropped(t, ddlEvents[0], tableID) require.Nil(t, ddlEvents[0].NeedAddedTables) @@ -601,7 +1350,7 @@ func TestHandleRenameTable(t *testing.T) { require.Nil(t, triggerDDLEvents[0].BlockedTables) require.Nil(t, triggerDDLEvents[0].NeedDroppedTables) - require.Equal(t, tableID, triggerDDLEvents[0].NeedAddedTables[0].TableID) + verifyTableIsAdded(t, triggerDDLEvents[0], tableID, schemaID2) require.Equal(t, "test2", triggerDDLEvents[0].TableNameChange.AddName[0].SchemaName) require.Equal(t, "t2", triggerDDLEvents[0].TableNameChange.AddName[0].TableName) @@ -622,6 +1371,269 @@ func TestHandleRenameTable(t *testing.T) { } } +func TestHandleRenamePartitionTable(t *testing.T) { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + err := os.RemoveAll(dbPath) + require.Nil(t, err) + + gcTs := uint64(500) + schemaID1 := int64(300) + schemaID2 := int64(305) + + databaseInfo := make(map[int64]*model.DBInfo) + databaseInfo[schemaID1] = &model.DBInfo{ + ID: schemaID1, + Name: model.NewCIStr("test"), + } + databaseInfo[schemaID2] = &model.DBInfo{ + ID: schemaID2, + Name: model.NewCIStr("test2"), + } + pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) + + // create a table + tableID := int64(100) + partitionID1 := tableID + 100 + partitionID2 := tableID + 200 + partitionID3 := tableID + 300 + { + job := &model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID1, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 501, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t1"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: partitionID3, + }, + }, + }, + }, + FinishedTS: 601, + }, + } + pStorage.handleDDLJob(job) + } + + // rename table to a different db + { + job := &model.Job{ + Type: model.ActionRenameTable, + SchemaID: schemaID2, + TableID: tableID, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 505, + TableInfo: &model.TableInfo{ + ID: tableID, + Name: model.NewCIStr("t2"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: partitionID3, + }, + }, + }, + }, + FinishedTS: 605, + }, + } + pStorage.handleDDLJob(job) + } + + { + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID1, nil, 601, 700) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + // rename table event + require.Equal(t, uint64(605), ddlEvents[0].FinishedTs) + verifyTableIsBlocked(t, ddlEvents[0], partitionID1) + verifyTableIsBlocked(t, ddlEvents[0], partitionID2) + verifyTableIsBlocked(t, ddlEvents[0], partitionID3) + verifyTableIsBlocked(t, ddlEvents[0], heartbeatpb.DDLSpan.TableID) + + require.Equal(t, 3, len(ddlEvents[0].UpdatedSchemas)) + } + + // test filter: after rename, the table is filtered out + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test.*"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableDDLEvents(partitionID1, tableFilter, 601, 700) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + verifyTableIsBlocked(t, ddlEvents[0], partitionID1) + verifyTableIsBlocked(t, ddlEvents[0], partitionID2) + verifyTableIsBlocked(t, ddlEvents[0], partitionID3) + verifyTableIsBlocked(t, ddlEvents[0], heartbeatpb.DDLSpan.TableID) + + verifyTableIsDropped(t, ddlEvents[0], partitionID1) + verifyTableIsDropped(t, ddlEvents[0], partitionID2) + verifyTableIsDropped(t, ddlEvents[0], partitionID3) + + require.Nil(t, ddlEvents[0].NeedAddedTables) + + require.Equal(t, 0, len(ddlEvents[0].TableNameChange.AddName)) + require.Equal(t, "test", ddlEvents[0].TableNameChange.DropName[0].SchemaName) + require.Equal(t, "t1", ddlEvents[0].TableNameChange.DropName[0].TableName) + } + + // test filter: before rename, the table is filtered out, so only table trigger can get the event + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test2.*"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + triggerDDLEvents, err := pStorage.fetchTableTriggerDDLEvents(tableFilter, 601, 10) + require.Nil(t, err) + require.Equal(t, 1, len(triggerDDLEvents)) + require.Nil(t, triggerDDLEvents[0].BlockedTables) + require.Nil(t, triggerDDLEvents[0].NeedDroppedTables) + + verifyTableIsAdded(t, triggerDDLEvents[0], partitionID1, schemaID2) + verifyTableIsAdded(t, triggerDDLEvents[0], partitionID2, schemaID2) + verifyTableIsAdded(t, triggerDDLEvents[0], partitionID3, schemaID2) + + require.Equal(t, "test2", triggerDDLEvents[0].TableNameChange.AddName[0].SchemaName) + require.Equal(t, "t2", triggerDDLEvents[0].TableNameChange.AddName[0].TableName) + require.Equal(t, 0, len(triggerDDLEvents[0].TableNameChange.DropName)) + } +} + +func TestCreateTables(t *testing.T) { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + err := os.RemoveAll(dbPath) + require.Nil(t, err) + + gcTs := uint64(500) + schemaID := int64(300) + + databaseInfo := make(map[int64]*model.DBInfo) + databaseInfo[schemaID] = &model.DBInfo{ + ID: schemaID, + Name: model.NewCIStr("test"), + } + pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) + + // create tables + tableID1 := int64(100) + tableID2 := tableID1 + 100 + tableID3 := tableID1 + 200 + { + job := &model.Job{ + Type: model.ActionCreateTables, + SchemaID: schemaID, + Query: "sql1;sql2;sql3", + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 501, + MultipleTableInfos: []*model.TableInfo{ + { + ID: tableID1, + Name: model.NewCIStr("t1"), + }, + { + ID: tableID2, + Name: model.NewCIStr("t2"), + }, + { + ID: tableID3, + Name: model.NewCIStr("t3"), + }, + }, + FinishedTS: 601, + }, + } + pStorage.handleDDLJob(job) + } + + { + require.Equal(t, 3, len(pStorage.databaseMap[schemaID].Tables)) + require.Equal(t, 3, len(pStorage.tableMap)) + } + + { + store := newEmptyVersionedTableInfoStore(tableID1) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t1", store.infos[0].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(tableID2) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t2", store.infos[0].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(tableID3) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t3", store.infos[0].info.Name.O) + } + + { + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(nil, 600, 601) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + + verifyTableIsAdded(t, ddlEvents[0], tableID1, schemaID) + verifyTableIsAdded(t, ddlEvents[0], tableID2, schemaID) + verifyTableIsAdded(t, ddlEvents[0], tableID3, schemaID) + } + + // filter t2 and t3 + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test.t1"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(tableFilter, 600, 601) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + + verifyTableIsAdded(t, ddlEvents[0], tableID1, schemaID) + require.Equal(t, 1, len(ddlEvents[0].NeedAddedTables)) + } + + // all filtered out + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test2.*"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(tableFilter, 600, 601) + require.Nil(t, err) + require.Equal(t, 0, len(ddlEvents)) + } +} + +func TestRenameTables(t *testing.T) { + +} + func TestFetchDDLEventsBasic(t *testing.T) { dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) err := os.RemoveAll(dbPath) @@ -771,7 +1783,6 @@ func TestFetchDDLEventsBasic(t *testing.T) { require.Equal(t, "test", ddlEvents[1].SchemaName) require.Equal(t, "t2", ddlEvents[1].TableName) require.Equal(t, common.InfluenceTypeNormal, ddlEvents[1].NeedDroppedTables.InfluenceType) - require.Equal(t, schemaID, ddlEvents[1].NeedDroppedTables.SchemaID) require.Equal(t, 1, len(ddlEvents[1].NeedDroppedTables.TableIDs)) require.Equal(t, tableID, ddlEvents[1].NeedDroppedTables.TableIDs[0]) require.Equal(t, 1, len(ddlEvents[1].NeedAddedTables)) @@ -801,7 +1812,6 @@ func TestFetchDDLEventsBasic(t *testing.T) { require.Equal(t, common.InfluenceTypeNormal, ddlEvents[0].NeedDroppedTables.InfluenceType) require.Equal(t, 1, len(ddlEvents[0].NeedDroppedTables.TableIDs)) require.Equal(t, tableID3, ddlEvents[0].NeedDroppedTables.TableIDs[0]) - require.Equal(t, schemaID, ddlEvents[0].NeedDroppedTables.SchemaID) } // fetch all table trigger ddl events @@ -825,7 +1835,6 @@ func TestFetchDDLEventsBasic(t *testing.T) { // drop table event require.Equal(t, uint64(611), tableTriggerDDLEvents[4].FinishedTs) require.Equal(t, common.InfluenceTypeNormal, tableTriggerDDLEvents[4].NeedDroppedTables.InfluenceType) - require.Equal(t, schemaID, tableTriggerDDLEvents[4].NeedDroppedTables.SchemaID) require.Equal(t, tableID3, tableTriggerDDLEvents[4].NeedDroppedTables.TableIDs[0]) require.Equal(t, schemaName, tableTriggerDDLEvents[4].TableNameChange.DropName[0].SchemaName) require.Equal(t, "t3", tableTriggerDDLEvents[4].TableNameChange.DropName[0].TableName) @@ -1024,7 +2033,7 @@ func TestGetAllPhysicalTables(t *testing.T) { schemaID := int64(300) gcTs := uint64(600) tableID1 := int64(100) - tableID2 := int64(200) + tableID2 := tableID1 + 100 databaseInfo := make(map[int64]*model.DBInfo) databaseInfo[schemaID] = &model.DBInfo{ @@ -1044,7 +2053,7 @@ func TestGetAllPhysicalTables(t *testing.T) { pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) // create table t3 - tableID3 := int64(500) + tableID3 := tableID2 + 100 { job := &model.Job{ Type: model.ActionCreateTable, @@ -1078,6 +2087,72 @@ func TestGetAllPhysicalTables(t *testing.T) { pStorage.handleDDLJob(job) } + // create partition table t4 + tableID4 := tableID3 + 100 + partitionID1 := tableID4 + 100 + partitionID2 := tableID4 + 200 + partitionID3 := tableID4 + 300 + { + job := &model.Job{ + Type: model.ActionCreateTable, + SchemaID: schemaID, + TableID: tableID4, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 503, + TableInfo: &model.TableInfo{ + ID: tableID4, + Name: model.NewCIStr("t4"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: partitionID3, + }, + }, + }, + }, + FinishedTS: 609, + }, + } + pStorage.handleDDLJob(job) + } + + // drop partition table t4 + { + job := &model.Job{ + Type: model.ActionDropTable, + SchemaID: schemaID, + TableID: tableID4, + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 505, + TableInfo: &model.TableInfo{ + ID: tableID4, + Name: model.NewCIStr("t4"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + { + ID: partitionID3, + }, + }, + }, + }, + FinishedTS: 611, + }, + } + pStorage.handleDDLJob(job) + } + { allPhysicalTables, err := pStorage.getAllPhysicalTables(600, nil) require.Nil(t, err) @@ -1095,4 +2170,22 @@ func TestGetAllPhysicalTables(t *testing.T) { require.Nil(t, err) require.Equal(t, 2, len(allPhysicalTables)) } + + { + allPhysicalTables, err := pStorage.getAllPhysicalTables(605, nil) + require.Nil(t, err) + require.Equal(t, 2, len(allPhysicalTables)) + } + + { + allPhysicalTables, err := pStorage.getAllPhysicalTables(609, nil) + require.Nil(t, err) + require.Equal(t, 5, len(allPhysicalTables)) + } + + { + allPhysicalTables, err := pStorage.getAllPhysicalTables(611, nil) + require.Nil(t, err) + require.Equal(t, 2, len(allPhysicalTables)) + } } diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index b3b8db135..3b9cc4dd6 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -140,6 +140,7 @@ func (s *schemaStore) updateResolvedTsPeriodically(ctx context.Context) error { for _, event := range resolvedEvents { if event.Job.BinlogInfo.SchemaVersion <= s.schemaVersion || event.Job.BinlogInfo.FinishedTS <= s.finishedDDLTs { log.Info("skip already applied ddl job", + zap.Any("type", event.Job.Type), zap.String("job", event.Job.Query), zap.Int64("jobSchemaVersion", event.Job.BinlogInfo.SchemaVersion), zap.Uint64("jobFinishTs", event.Job.BinlogInfo.FinishedTS), diff --git a/logservice/schemastore/types.go b/logservice/schemastore/types.go index 2056128a9..c3d0d3991 100644 --- a/logservice/schemastore/types.go +++ b/logservice/schemastore/types.go @@ -11,23 +11,22 @@ type PersistedDDLEvent struct { ID int64 `msg:"id"` Type byte `msg:"type"` - // SchemaID means different for different job types: - // - ExchangeTablePartition: db id of non-partitioned table - // TableID means different for different job types: - // - ExchangeTablePartition: non-partitioned table id - // For truncate table, it it the table id of the newly created table - + // for exchange partition, it is the info of the partition table CurrentSchemaID int64 `msg:"current_schema_id"` CurrentTableID int64 `msg:"current_table_id"` CurrentSchemaName string `msg:"current_schema_name"` CurrentTableName string `msg:"current_table_name"` - // The following fields are only set when the ddl job involves a prev table and the corresponding fields change + // The following fields are only set when the ddl job involves a prev table + // for exchange partition, it is the info of the normal table before exchange PrevSchemaID int64 `msg:"prev_schema_id"` PrevTableID int64 `msg:"prev_table_id"` PrevSchemaName string `msg:"prev_schema_name"` PrevTableName string `msg:"prev_table_name"` + // The following fields are only set when the ddl job involves a partition table + PrevPartitions []int64 `msg:"prev_partitions"` + Query string `msg:"query"` SchemaVersion int64 `msg:"schema_version"` DBInfo *model.DBInfo `msg:"-"` @@ -35,6 +34,10 @@ type PersistedDDLEvent struct { // TODO: use a custom struct to store the table info? TableInfoValue []byte `msg:"table_info_value"` FinishedTs uint64 `msg:"finished_ts"` + + MultipleTableInfos []*model.TableInfo `msg:"-"` + MultipleTableInfosValue [][]byte `msg:"multi_table_info_value"` + // TODO: do we need the following two fields? BDRRole string `msg:"bdr_role"` CDCWriteSource uint64 `msg:"cdc_write_source"` @@ -65,6 +68,8 @@ type BasicTableInfo struct { Name string } +type BasicPartitionInfo map[int64]interface{} + //msgp:ignore DDLJobWithCommitTs type DDLJobWithCommitTs struct { Job *model.Job diff --git a/logservice/schemastore/types_gen.go b/logservice/schemastore/types_gen.go index daef63883..439509cab 100644 --- a/logservice/schemastore/types_gen.go +++ b/logservice/schemastore/types_gen.go @@ -84,6 +84,25 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "PrevTableName") return } + case "prev_partitions": + var zb0002 uint32 + zb0002, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "PrevPartitions") + return + } + if cap(z.PrevPartitions) >= int(zb0002) { + z.PrevPartitions = (z.PrevPartitions)[:zb0002] + } else { + z.PrevPartitions = make([]int64, zb0002) + } + for za0001 := range z.PrevPartitions { + z.PrevPartitions[za0001], err = dc.ReadInt64() + if err != nil { + err = msgp.WrapError(err, "PrevPartitions", za0001) + return + } + } case "query": z.Query, err = dc.ReadString() if err != nil { @@ -108,6 +127,25 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { err = msgp.WrapError(err, "FinishedTs") return } + case "multi_table_info_value": + var zb0003 uint32 + zb0003, err = dc.ReadArrayHeader() + if err != nil { + err = msgp.WrapError(err, "MultipleTableInfosValue") + return + } + if cap(z.MultipleTableInfosValue) >= int(zb0003) { + z.MultipleTableInfosValue = (z.MultipleTableInfosValue)[:zb0003] + } else { + z.MultipleTableInfosValue = make([][]byte, zb0003) + } + for za0002 := range z.MultipleTableInfosValue { + z.MultipleTableInfosValue[za0002], err = dc.ReadBytes(z.MultipleTableInfosValue[za0002]) + if err != nil { + err = msgp.WrapError(err, "MultipleTableInfosValue", za0002) + return + } + } case "bdr_role": z.BDRRole, err = dc.ReadString() if err != nil { @@ -133,9 +171,9 @@ func (z *PersistedDDLEvent) DecodeMsg(dc *msgp.Reader) (err error) { // EncodeMsg implements msgp.Encodable func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { - // map header, size 16 + // map header, size 18 // write "id" - err = en.Append(0xde, 0x0, 0x10, 0xa2, 0x69, 0x64) + err = en.Append(0xde, 0x0, 0x12, 0xa2, 0x69, 0x64) if err != nil { return } @@ -234,6 +272,23 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "PrevTableName") return } + // write "prev_partitions" + err = en.Append(0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.PrevPartitions))) + if err != nil { + err = msgp.WrapError(err, "PrevPartitions") + return + } + for za0001 := range z.PrevPartitions { + err = en.WriteInt64(z.PrevPartitions[za0001]) + if err != nil { + err = msgp.WrapError(err, "PrevPartitions", za0001) + return + } + } // write "query" err = en.Append(0xa5, 0x71, 0x75, 0x65, 0x72, 0x79) if err != nil { @@ -274,6 +329,23 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { err = msgp.WrapError(err, "FinishedTs") return } + // write "multi_table_info_value" + err = en.Append(0xb6, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + if err != nil { + return + } + err = en.WriteArrayHeader(uint32(len(z.MultipleTableInfosValue))) + if err != nil { + err = msgp.WrapError(err, "MultipleTableInfosValue") + return + } + for za0002 := range z.MultipleTableInfosValue { + err = en.WriteBytes(z.MultipleTableInfosValue[za0002]) + if err != nil { + err = msgp.WrapError(err, "MultipleTableInfosValue", za0002) + return + } + } // write "bdr_role" err = en.Append(0xa8, 0x62, 0x64, 0x72, 0x5f, 0x72, 0x6f, 0x6c, 0x65) if err != nil { @@ -300,9 +372,9 @@ func (z *PersistedDDLEvent) EncodeMsg(en *msgp.Writer) (err error) { // MarshalMsg implements msgp.Marshaler func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { o = msgp.Require(b, z.Msgsize()) - // map header, size 16 + // map header, size 18 // string "id" - o = append(o, 0xde, 0x0, 0x10, 0xa2, 0x69, 0x64) + o = append(o, 0xde, 0x0, 0x12, 0xa2, 0x69, 0x64) o = msgp.AppendInt64(o, z.ID) // string "type" o = append(o, 0xa4, 0x74, 0x79, 0x70, 0x65) @@ -331,6 +403,12 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "prev_table_name" o = append(o, 0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65) o = msgp.AppendString(o, z.PrevTableName) + // string "prev_partitions" + o = append(o, 0xaf, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73) + o = msgp.AppendArrayHeader(o, uint32(len(z.PrevPartitions))) + for za0001 := range z.PrevPartitions { + o = msgp.AppendInt64(o, z.PrevPartitions[za0001]) + } // string "query" o = append(o, 0xa5, 0x71, 0x75, 0x65, 0x72, 0x79) o = msgp.AppendString(o, z.Query) @@ -343,6 +421,12 @@ func (z *PersistedDDLEvent) MarshalMsg(b []byte) (o []byte, err error) { // string "finished_ts" o = append(o, 0xab, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x5f, 0x74, 0x73) o = msgp.AppendUint64(o, z.FinishedTs) + // string "multi_table_info_value" + o = append(o, 0xb6, 0x6d, 0x75, 0x6c, 0x74, 0x69, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65) + o = msgp.AppendArrayHeader(o, uint32(len(z.MultipleTableInfosValue))) + for za0002 := range z.MultipleTableInfosValue { + o = msgp.AppendBytes(o, z.MultipleTableInfosValue[za0002]) + } // string "bdr_role" o = append(o, 0xa8, 0x62, 0x64, 0x72, 0x5f, 0x72, 0x6f, 0x6c, 0x65) o = msgp.AppendString(o, z.BDRRole) @@ -430,6 +514,25 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "PrevTableName") return } + case "prev_partitions": + var zb0002 uint32 + zb0002, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "PrevPartitions") + return + } + if cap(z.PrevPartitions) >= int(zb0002) { + z.PrevPartitions = (z.PrevPartitions)[:zb0002] + } else { + z.PrevPartitions = make([]int64, zb0002) + } + for za0001 := range z.PrevPartitions { + z.PrevPartitions[za0001], bts, err = msgp.ReadInt64Bytes(bts) + if err != nil { + err = msgp.WrapError(err, "PrevPartitions", za0001) + return + } + } case "query": z.Query, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -454,6 +557,25 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "FinishedTs") return } + case "multi_table_info_value": + var zb0003 uint32 + zb0003, bts, err = msgp.ReadArrayHeaderBytes(bts) + if err != nil { + err = msgp.WrapError(err, "MultipleTableInfosValue") + return + } + if cap(z.MultipleTableInfosValue) >= int(zb0003) { + z.MultipleTableInfosValue = (z.MultipleTableInfosValue)[:zb0003] + } else { + z.MultipleTableInfosValue = make([][]byte, zb0003) + } + for za0002 := range z.MultipleTableInfosValue { + z.MultipleTableInfosValue[za0002], bts, err = msgp.ReadBytesBytes(bts, z.MultipleTableInfosValue[za0002]) + if err != nil { + err = msgp.WrapError(err, "MultipleTableInfosValue", za0002) + return + } + } case "bdr_role": z.BDRRole, bts, err = msgp.ReadStringBytes(bts) if err != nil { @@ -480,7 +602,11 @@ func (z *PersistedDDLEvent) UnmarshalMsg(bts []byte) (o []byte, err error) { // Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message func (z *PersistedDDLEvent) Msgsize() (s int) { - s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 18 + msgp.Int64Size + 17 + msgp.Int64Size + 20 + msgp.StringPrefixSize + len(z.CurrentSchemaName) + 19 + msgp.StringPrefixSize + len(z.CurrentTableName) + 15 + msgp.Int64Size + 14 + msgp.Int64Size + 17 + msgp.StringPrefixSize + len(z.PrevSchemaName) + 16 + msgp.StringPrefixSize + len(z.PrevTableName) + 6 + msgp.StringPrefixSize + len(z.Query) + 15 + msgp.Int64Size + 17 + msgp.BytesPrefixSize + len(z.TableInfoValue) + 12 + msgp.Uint64Size + 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size + s = 3 + 3 + msgp.Int64Size + 5 + msgp.ByteSize + 18 + msgp.Int64Size + 17 + msgp.Int64Size + 20 + msgp.StringPrefixSize + len(z.CurrentSchemaName) + 19 + msgp.StringPrefixSize + len(z.CurrentTableName) + 15 + msgp.Int64Size + 14 + msgp.Int64Size + 17 + msgp.StringPrefixSize + len(z.PrevSchemaName) + 16 + msgp.StringPrefixSize + len(z.PrevTableName) + 16 + msgp.ArrayHeaderSize + (len(z.PrevPartitions) * (msgp.Int64Size)) + 6 + msgp.StringPrefixSize + len(z.Query) + 15 + msgp.Int64Size + 17 + msgp.BytesPrefixSize + len(z.TableInfoValue) + 12 + msgp.Uint64Size + 23 + msgp.ArrayHeaderSize + for za0002 := range z.MultipleTableInfosValue { + s += msgp.BytesPrefixSize + len(z.MultipleTableInfosValue[za0002]) + } + s += 9 + msgp.StringPrefixSize + len(z.BDRRole) + 17 + msgp.Uint64Size return } diff --git a/pkg/common/event.go b/pkg/common/event.go index dc0d45c18..a28353de8 100644 --- a/pkg/common/event.go +++ b/pkg/common/event.go @@ -356,6 +356,12 @@ type TableNameChange struct { DropDatabaseName string } +type SchemaIDChange struct { + TableID int64 + OldSchemaID int64 + NewSchemaID int64 +} + type DDLEvent struct { DispatcherID DispatcherID `json:"dispatcher_id"` Type byte `json:"type"` @@ -364,8 +370,7 @@ type DDLEvent struct { SchemaID int64 `json:"schema_id"` // TableID means different for different job types: // - ExchangeTablePartition: non-partitioned table id - TableID int64 `json:"table_id"` - // TODO: need verify the meaning of SchemaName and TableName for different ddl + TableID int64 `json:"table_id"` SchemaName string `json:"schema_name"` TableName string `json:"table_name"` Query string `json:"query"` @@ -376,14 +381,21 @@ type DDLEvent struct { MultipleTableInfos []*TableInfo `json:"multiple_table_infos"` BlockedTables *InfluencedTables `json:"blocked_tables"` + UpdatedSchemas []SchemaIDChange `json:"updated_schemas"` NeedDroppedTables *InfluencedTables `json:"need_dropped_tables"` NeedAddedTables []Table `json:"need_added_tables"` - TiDBOnly bool `json:"tidb_only"` - - // only Create Table / Create Tables / Drop Table / Rename Table / - // Rename Tables / Drop Schema / Recover Table will make the table name change + // DDLs which may change table name: + // Create Table + // Create Tables + // Drop Table + // Rename Table + // Rename Tables + // Drop Schema + // Recover Table TableNameChange *TableNameChange `json:"table_name_change"` + + TiDBOnly bool `json:"tidb_only"` // 用于在event flush 后执行,后续兼容不同下游的时候要看是不是要拆下去 PostTxnFlushed []func() `msg:"-"` }