From 579ea47ed407c09d34dcaf1b58c1405fc984d401 Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Tue, 8 Oct 2024 23:23:12 +0800 Subject: [PATCH] schemastore: support create tables for partition table (#351) * initialize partition map * add some log * fix * small refactor * small fix * support createt tables for partition table --- logservice/schemastore/disk_format.go | 47 ++--- logservice/schemastore/multi_version.go | 34 ++- logservice/schemastore/persist_storage.go | 196 +++++++++++------- .../schemastore/persist_storage_test.go | 171 ++++++++++++++- 4 files changed, 335 insertions(+), 113 deletions(-) diff --git a/logservice/schemastore/disk_format.go b/logservice/schemastore/disk_format.go index e79ff5118..77b6ee459 100644 --- a/logservice/schemastore/disk_format.go +++ b/logservice/schemastore/disk_format.go @@ -271,15 +271,7 @@ func loadAndApplyDDLHistory( } defer snapIter.Close() for snapIter.First(); snapIter.Valid(); snapIter.Next() { - var ddlEvent PersistedDDLEvent - if _, err = ddlEvent.UnmarshalMsg(snapIter.Value()); err != nil { - log.Fatal("unmarshal ddl job failed", zap.Error(err)) - } - ddlEvent.TableInfo = &model.TableInfo{} - if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil { - log.Fatal("unmarshal table info failed", zap.Error(err)) - } - + ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value()) if shouldSkipDDL(&ddlEvent, databaseMap, tableMap) { continue } @@ -326,18 +318,9 @@ func readTableInfoInKVSnap(snap *pebble.Snapshot, tableID int64, version uint64) return common.WrapTableInfo(table_info_entry.SchemaID, table_info_entry.SchemaName, version, tableInfo) } -func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEvent { - ddlKey, err := ddlJobKey(version) - if err != nil { - log.Fatal("generate ddl job key failed", zap.Error(err)) - } - ddlValue, closer, err := snap.Get(ddlKey) - if err != nil { - log.Fatal("get ddl job failed", zap.Error(err)) - } - defer closer.Close() +func unmarshalPersistedDDLEvent(value []byte) PersistedDDLEvent { var ddlEvent PersistedDDLEvent - if _, err := ddlEvent.UnmarshalMsg(ddlValue); err != nil { + if _, err := ddlEvent.UnmarshalMsg(value); err != nil { log.Fatal("unmarshal ddl job failed", zap.Error(err)) } @@ -359,6 +342,19 @@ func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEv return ddlEvent } +func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEvent { + ddlKey, err := ddlJobKey(version) + if err != nil { + log.Fatal("generate ddl job key failed", zap.Error(err)) + } + ddlValue, closer, err := snap.Get(ddlKey) + if err != nil { + log.Fatal("get ddl job failed", zap.Error(err)) + } + defer closer.Close() + return unmarshalPersistedDDLEvent(ddlValue) +} + func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error { batch := db.NewBatch() ddlKey, err := ddlJobKey(ddlEvent.FinishedTs) @@ -584,14 +580,7 @@ func loadAllPhysicalTablesAtTs( } defer snapIter.Close() for snapIter.First(); snapIter.Valid(); snapIter.Next() { - var ddlEvent PersistedDDLEvent - if _, err = ddlEvent.UnmarshalMsg(snapIter.Value()); err != nil { - log.Fatal("unmarshal ddl job failed", zap.Error(err)) - } - ddlEvent.TableInfo = &model.TableInfo{} - if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil { - log.Fatal("unmarshal table info failed", zap.Error(err)) - } + ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value()) if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil { log.Panic("updateDatabaseInfo error", zap.Error(err)) } @@ -625,5 +614,7 @@ func loadAllPhysicalTablesAtTs( }) } } + log.Info("loadAllPhysicalTablesAtTs", + zap.Int("tableLen", len(tables))) return tables, nil } diff --git a/logservice/schemastore/multi_version.go b/logservice/schemastore/multi_version.go index 7895c3a25..14e7bdb1a 100644 --- a/logservice/schemastore/multi_version.go +++ b/logservice/schemastore/multi_version.go @@ -243,9 +243,9 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { assertNonEmpty(v.infos, event) appendTableInfo() case model.ActionTruncateTable: - if isPartitionTableEvent(event) { + if isPartitionTable(event.TableInfo) { createTable := false - for _, partition := range getAllPartitionIDs(event) { + for _, partition := range getAllPartitionIDs(event.TableInfo) { if v.tableID == partition { createTable = true break @@ -271,7 +271,7 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { assertNonEmpty(v.infos, event) appendTableInfo() case model.ActionAddTablePartition: - newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, partition := range newCreatedIDs { if v.tableID == partition { appendTableInfo() @@ -279,7 +279,7 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { } } case model.ActionDropTablePartition: - droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, partition := range droppedIDs { if v.tableID == partition { v.deleteVersion = uint64(event.FinishedTs) @@ -287,7 +287,7 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { } } case model.ActionTruncateTablePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) dropped := false for _, partition := range droppedIDs { @@ -324,15 +324,27 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) { case model.ActionCreateTables: assertEmpty(v.infos, event) for _, tableInfo := range event.MultipleTableInfos { - if v.tableID == tableInfo.ID { - info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo) - info.InitPreSQLs() - v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) - break + if isPartitionTable(tableInfo) { + for _, partitionID := range getAllPartitionIDs(tableInfo) { + if v.tableID == partitionID { + info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo) + info.InitPreSQLs() + v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) + break + } + } + } else { + if v.tableID == tableInfo.ID { + info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo) + info.InitPreSQLs() + v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info}) + break + } } + } case model.ActionReorganizePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) dropped := false for _, partition := range droppedIDs { diff --git a/logservice/schemastore/persist_storage.go b/logservice/schemastore/persist_storage.go index ef750ee89..c73da2426 100644 --- a/logservice/schemastore/persist_storage.go +++ b/logservice/schemastore/persist_storage.go @@ -134,6 +134,7 @@ func newPersistentStorage( gcTs: gcTs, upperBound: upperBound, tableMap: make(map[int64]*BasicTableInfo), + partitionMap: make(map[int64]BasicPartitionInfo), databaseMap: make(map[int64]*BasicDatabaseInfo), tablesDDLHistory: make(map[int64][]uint64), tableTriggerDDLHistory: make([]uint64, 0), @@ -606,6 +607,7 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error { zap.Int64("schemaID", ddlEvent.CurrentSchemaID), zap.Int64("tableID", ddlEvent.CurrentTableID), zap.Uint64("finishedTs", ddlEvent.FinishedTs), + zap.Int64("schemaVersion", ddlEvent.SchemaVersion), zap.String("ddlType", model.ActionType(ddlEvent.Type).String()), zap.String("query", ddlEvent.Query)) @@ -710,7 +712,7 @@ func buildPersistedDDLEventFromJob( event.CurrentTableID = event.TableInfo.ID event.CurrentSchemaName = getSchemaName(event.CurrentSchemaID) event.CurrentTableName = getTableName(event.PrevTableID) - if isPartitionTableEvent(&event) { + if isPartitionTable(event.TableInfo) { for id := range partitionMap[event.PrevTableID] { event.PrevPartitions = append(event.PrevPartitions, id) } @@ -816,14 +818,14 @@ func shouldSkipDDL( return false } -func isPartitionTableEvent(ddlEvent *PersistedDDLEvent) bool { - // ddlEvent.TableInfo may only be nil in unit test - return ddlEvent.TableInfo != nil && ddlEvent.TableInfo.Partition != nil +func isPartitionTable(tableInfo *model.TableInfo) bool { + // tableInfo may only be nil in unit test + return tableInfo != nil && tableInfo.Partition != nil } -func getAllPartitionIDs(ddlEvent *PersistedDDLEvent) []int64 { - physicalIDs := make([]int64, 0, len(ddlEvent.TableInfo.Partition.Definitions)) - for _, partition := range ddlEvent.TableInfo.Partition.Definitions { +func getAllPartitionIDs(tableInfo *model.TableInfo) []int64 { + physicalIDs := make([]int64, 0, len(tableInfo.Partition.Definitions)) + for _, partition := range tableInfo.Partition.Definitions { physicalIDs = append(physicalIDs, partition.ID) } return physicalIDs @@ -858,9 +860,9 @@ func updateDDLHistory( tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) // Note: for create table, this ddl event will not be sent to table dispatchers. // add it to ddl history is just for building table info store. - if isPartitionTableEvent(ddlEvent) { + if isPartitionTable(ddlEvent.TableInfo) { // for partition table, we only care the ddl history of physical table ids. - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) } else { appendTableHistory(ddlEvent.CurrentTableID) } @@ -870,14 +872,14 @@ func updateDDLHistory( model.ActionDropIndex, model.ActionAddForeignKey, model.ActionDropForeignKey: - if isPartitionTableEvent(ddlEvent) { - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + if isPartitionTable(ddlEvent.TableInfo) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) } else { appendTableHistory(ddlEvent.CurrentTableID) } case model.ActionTruncateTable: - if isPartitionTableEvent(ddlEvent) { - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + if isPartitionTable(ddlEvent.TableInfo) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) appendPartitionsHistory(ddlEvent.PrevPartitions) } else { appendTableHistory(ddlEvent.CurrentTableID) @@ -885,15 +887,15 @@ func updateDDLHistory( } case model.ActionModifyColumn, model.ActionRebaseAutoID: - if isPartitionTableEvent(ddlEvent) { - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + if isPartitionTable(ddlEvent.TableInfo) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) } else { appendTableHistory(ddlEvent.CurrentTableID) } case model.ActionRenameTable: tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) - if isPartitionTableEvent(ddlEvent) { - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + if isPartitionTable(ddlEvent.TableInfo) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) } else { appendTableHistory(ddlEvent.CurrentTableID) } @@ -901,14 +903,14 @@ func updateDDLHistory( model.ActionShardRowID, model.ActionModifyTableComment, model.ActionRenameIndex: - if isPartitionTableEvent(ddlEvent) { - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + if isPartitionTable(ddlEvent.TableInfo) { + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) } else { appendTableHistory(ddlEvent.CurrentTableID) } case model.ActionAddTablePartition: // all partitions include newly create partitions will receive this event - appendPartitionsHistory(getAllPartitionIDs(ddlEvent)) + appendPartitionsHistory(getAllPartitionIDs(ddlEvent.TableInfo)) case model.ActionDropTablePartition: // TODO: verify all partitions include dropped partitions will receive this event appendPartitionsHistory(ddlEvent.PrevPartitions) @@ -919,10 +921,10 @@ func updateDDLHistory( } case model.ActionTruncateTablePartition: appendPartitionsHistory(ddlEvent.PrevPartitions) - newCreateIDs := getCreatedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent)) + newCreateIDs := getCreatedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent.TableInfo)) appendPartitionsHistory(newCreateIDs) case model.ActionExchangeTablePartition: - droppedIDs := getDroppedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent)) + droppedIDs := getDroppedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent.TableInfo)) if len(droppedIDs) != 1 { log.Panic("exchange table partition should only drop one partition", zap.Int64s("droppedIDs", droppedIDs)) @@ -933,11 +935,16 @@ func updateDDLHistory( tableTriggerDDLHistory = append(tableTriggerDDLHistory, ddlEvent.FinishedTs) // it won't be send to table dispatchers, just for build version store for _, info := range ddlEvent.MultipleTableInfos { - appendTableHistory(info.ID) + if isPartitionTable(info) { + // for partition table, we only care the ddl history of physical table ids. + appendPartitionsHistory(getAllPartitionIDs(info)) + } else { + appendTableHistory(info.ID) + } } case model.ActionReorganizePartition: appendPartitionsHistory(ddlEvent.PrevPartitions) - newCreateIDs := getCreatedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent)) + newCreateIDs := getCreatedIDs(ddlEvent.PrevPartitions, getAllPartitionIDs(ddlEvent.TableInfo)) appendPartitionsHistory(newCreateIDs) default: log.Panic("unknown ddl type", @@ -1010,16 +1017,16 @@ func updateDatabaseInfoAndTableInfo( delete(databaseMap, event.CurrentSchemaID) case model.ActionCreateTable: createTable(event.CurrentSchemaID, event.CurrentTableID) - if isPartitionTableEvent(event) { + if isPartitionTable(event.TableInfo) { partitionInfo := make(BasicPartitionInfo) - for _, partition := range event.TableInfo.Partition.Definitions { - partitionInfo[partition.ID] = nil + for _, id := range getAllPartitionIDs(event.TableInfo) { + partitionInfo[id] = nil } partitionMap[event.CurrentTableID] = partitionInfo } case model.ActionDropTable: dropTable(event.CurrentSchemaID, event.CurrentTableID) - if isPartitionTableEvent(event) { + if isPartitionTable(event.TableInfo) { delete(partitionMap, event.CurrentTableID) } case model.ActionAddColumn, @@ -1032,11 +1039,11 @@ func updateDatabaseInfoAndTableInfo( case model.ActionTruncateTable: dropTable(event.CurrentSchemaID, event.PrevTableID) createTable(event.CurrentSchemaID, event.CurrentTableID) - if isPartitionTableEvent(event) { + if isPartitionTable(event.TableInfo) { delete(partitionMap, event.PrevTableID) partitionInfo := make(BasicPartitionInfo) - for _, partition := range event.TableInfo.Partition.Definitions { - partitionInfo[partition.ID] = nil + for _, id := range getAllPartitionIDs(event.TableInfo) { + partitionInfo[id] = nil } partitionMap[event.CurrentTableID] = partitionInfo } @@ -1056,19 +1063,19 @@ func updateDatabaseInfoAndTableInfo( model.ActionRenameIndex: // TODO: verify can be ignored case model.ActionAddTablePartition: - newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, id := range newCreatedIDs { partitionMap[event.CurrentTableID][id] = nil } case model.ActionDropTablePartition: - droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, id := range droppedIDs { delete(partitionMap[event.CurrentTableID], id) } case model.ActionCreateView: // ignore case model.ActionTruncateTablePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) for _, id := range droppedIDs { delete(partitionMap[event.CurrentTableID], id) @@ -1078,7 +1085,7 @@ func updateDatabaseInfoAndTableInfo( partitionMap[event.CurrentTableID][id] = nil } case model.ActionExchangeTablePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) if len(droppedIDs) != 1 { log.Panic("exchange table partition should only drop one partition", @@ -1090,15 +1097,26 @@ func updateDatabaseInfoAndTableInfo( delete(partitionMap[event.CurrentTableID], targetPartitionID) partitionMap[event.CurrentTableID][event.PrevTableID] = nil case model.ActionCreateTables: + if event.MultipleTableInfos == nil { + log.Panic("multiple table infos should not be nil") + } for _, info := range event.MultipleTableInfos { addTableToDB(event.CurrentSchemaID, info.ID) tableMap[info.ID] = &BasicTableInfo{ SchemaID: event.CurrentSchemaID, Name: info.Name.O, } + if isPartitionTable(info) { + partitionInfo := make(BasicPartitionInfo) + for _, id := range getAllPartitionIDs(info) { + partitionInfo[id] = nil + } + partitionMap[info.ID] = partitionInfo + } } + case model.ActionReorganizePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) for _, id := range droppedIDs { delete(partitionMap[event.CurrentTableID], id) @@ -1121,8 +1139,8 @@ func updateRegisteredTableInfoStore( tableInfoStoreMap map[int64]*versionedTableInfoStore, ) error { tryApplyDDLToStore := func() { - if isPartitionTableEvent(event) { - allPhysicalIDs := getAllPartitionIDs(event) + if isPartitionTable(event.TableInfo) { + allPhysicalIDs := getAllPartitionIDs(event.TableInfo) for _, id := range allPhysicalIDs { if store, ok := tableInfoStoreMap[id]; ok { store.applyDDL(event) @@ -1140,8 +1158,8 @@ func updateRegisteredTableInfoStore( model.ActionDropSchema: // ignore case model.ActionCreateTable: - if isPartitionTableEvent(event) { - allPhysicalIDs := getAllPartitionIDs(event) + if isPartitionTable(event.TableInfo) { + allPhysicalIDs := getAllPartitionIDs(event.TableInfo) for _, id := range allPhysicalIDs { if _, ok := tableInfoStoreMap[event.CurrentTableID]; ok { log.Panic("newly created tables should not be registered", @@ -1164,13 +1182,13 @@ func updateRegisteredTableInfoStore( model.ActionDropForeignKey: // ignore case model.ActionTruncateTable: - if isPartitionTableEvent(event) { + if isPartitionTable(event.TableInfo) { for _, id := range event.PrevPartitions { if store, ok := tableInfoStoreMap[id]; ok { store.applyDDL(event) } } - allPhysicalIDs := getAllPartitionIDs(event) + allPhysicalIDs := getAllPartitionIDs(event.TableInfo) for _, id := range allPhysicalIDs { if _, ok := tableInfoStoreMap[id]; ok { log.Panic("newly created tables should not be registered", @@ -1199,7 +1217,7 @@ func updateRegisteredTableInfoStore( model.ActionRenameIndex: // TODO: verify can be ignored case model.ActionAddTablePartition: - newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, id := range newCreatedIDs { if _, ok := tableInfoStoreMap[id]; ok { log.Panic("newly created partitions should not be registered", @@ -1207,7 +1225,7 @@ func updateRegisteredTableInfoStore( } } case model.ActionDropTablePartition: - droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event)) + droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo)) for _, id := range droppedIDs { if store, ok := tableInfoStoreMap[id]; ok { store.applyDDL(event) @@ -1216,7 +1234,7 @@ func updateRegisteredTableInfoStore( case model.ActionCreateView: // ignore case model.ActionTruncateTablePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) for _, id := range droppedIDs { if store, ok := tableInfoStoreMap[id]; ok { @@ -1231,7 +1249,7 @@ func updateRegisteredTableInfoStore( } } case model.ActionExchangeTablePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) if len(droppedIDs) != 1 { log.Panic("exchange table partition should only drop one partition", @@ -1246,13 +1264,23 @@ func updateRegisteredTableInfoStore( } case model.ActionCreateTables: for _, info := range event.MultipleTableInfos { - if _, ok := tableInfoStoreMap[info.ID]; ok { - log.Panic("newly created tables should not be registered", - zap.Int64("tableID", info.ID)) + if isPartitionTable(info) { + for _, id := range getAllPartitionIDs(info) { + if _, ok := tableInfoStoreMap[id]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", id)) + } + } + } else { + if _, ok := tableInfoStoreMap[info.ID]; ok { + log.Panic("newly created tables should not be registered", + zap.Int64("tableID", info.ID)) + } } + } case model.ActionReorganizePartition: - physicalIDs := getAllPartitionIDs(event) + physicalIDs := getAllPartitionIDs(event.TableInfo) droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs) for _, id := range droppedIDs { if store, ok := tableInfoStoreMap[id]; ok { @@ -1318,8 +1346,8 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo DropDatabaseName: rawEvent.CurrentSchemaName, } case model.ActionCreateTable: - if isPartitionTableEvent(rawEvent) { - physicalIDs := getAllPartitionIDs(rawEvent) + if isPartitionTable(rawEvent.TableInfo) { + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) ddlEvent.NeedAddedTables = make([]common.Table, 0, len(physicalIDs)) for _, id := range physicalIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ @@ -1344,8 +1372,8 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo }, } case model.ActionDropTable: - if isPartitionTableEvent(rawEvent) { - allPhysicalTableIDs := getAllPartitionIDs(rawEvent) + if isPartitionTable(rawEvent.TableInfo) { + allPhysicalTableIDs := getAllPartitionIDs(rawEvent.TableInfo) allPhysicalTableIDsAndDDLSpanID := make([]int64, 0, len(rawEvent.TableInfo.Partition.Definitions)+1) allPhysicalTableIDsAndDDLSpanID = append(allPhysicalTableIDsAndDDLSpanID, allPhysicalTableIDs...) allPhysicalTableIDsAndDDLSpanID = append(allPhysicalTableIDsAndDDLSpanID, heartbeatpb.DDLSpan.TableID) @@ -1383,7 +1411,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo model.ActionDropForeignKey: // ignore case model.ActionTruncateTable: - if isPartitionTableEvent(rawEvent) { + if isPartitionTable(rawEvent.TableInfo) { if len(rawEvent.PrevPartitions) > 1 { // if more than one partitions, we need block them ddlEvent.BlockedTables = &common.InfluencedTables{ @@ -1396,7 +1424,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo InfluenceType: common.InfluenceTypeNormal, TableIDs: rawEvent.PrevPartitions, } - physicalIDs := getAllPartitionIDs(rawEvent) + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) ddlEvent.NeedAddedTables = make([]common.Table, 0, len(physicalIDs)) for _, id := range physicalIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ @@ -1422,8 +1450,8 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo case model.ActionRenameTable: ignorePrevTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName) ignoreCurrentTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) - if isPartitionTableEvent(rawEvent) { - allPhysicalIDs := getAllPartitionIDs(rawEvent) + if isPartitionTable(rawEvent.TableInfo) { + allPhysicalIDs := getAllPartitionIDs(rawEvent.TableInfo) if !ignorePrevTable { allPhysicalIDsAndDDLSpanID := make([]int64, 0, len(allPhysicalIDs)+1) allPhysicalIDsAndDDLSpanID = append(allPhysicalIDsAndDDLSpanID, allPhysicalIDs...) @@ -1558,7 +1586,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo TableIDs: rawEvent.PrevPartitions, } } - physicalIDs := getAllPartitionIDs(rawEvent) + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) ddlEvent.NeedAddedTables = make([]common.Table, 0, len(newCreatedIDs)) for _, id := range newCreatedIDs { @@ -1574,7 +1602,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo TableIDs: rawEvent.PrevPartitions, } } - physicalIDs := getAllPartitionIDs(rawEvent) + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) ddlEvent.NeedDroppedTables = &common.InfluencedTables{ InfluenceType: common.InfluenceTypeNormal, @@ -1591,7 +1619,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo TableIDs: rawEvent.PrevPartitions, } } - physicalIDs := getAllPartitionIDs(rawEvent) + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) for _, id := range newCreatedIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ @@ -1607,7 +1635,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo case model.ActionExchangeTablePartition: ignoreNormalTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.PrevSchemaName, rawEvent.PrevTableName) ignorePartitionTable := tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.CurrentTableName) - physicalIDs := getAllPartitionIDs(rawEvent) + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) droppedIDs := getDroppedIDs(rawEvent.PrevPartitions, physicalIDs) if len(droppedIDs) != 1 { log.Panic("exchange table partition should only drop one partition", @@ -1659,21 +1687,43 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo log.Fatal("should not happen") } case model.ActionCreateTables: - ddlEvent.NeedAddedTables = make([]common.Table, 0, len(rawEvent.MultipleTableInfos)) - addName := make([]common.SchemaTableName, 0, len(rawEvent.MultipleTableInfos)) + physicalTableCount := 0 + logicalTableCount := 0 + for _, info := range rawEvent.MultipleTableInfos { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) { + continue + } + logicalTableCount += 1 + if isPartitionTable(info) { + physicalTableCount += len(info.Partition.Definitions) + } else { + physicalTableCount += 1 + } + } querys := strings.Split(rawEvent.Query, ";") - resultQuerys := make([]string, 0, len(rawEvent.MultipleTableInfos)) - for i := range rawEvent.MultipleTableInfos { - if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, rawEvent.MultipleTableInfos[i].Name.O) { + ddlEvent.NeedAddedTables = make([]common.Table, 0, physicalTableCount) + addName := make([]common.SchemaTableName, 0, logicalTableCount) + resultQuerys := make([]string, 0, logicalTableCount) + for i, info := range rawEvent.MultipleTableInfos { + if tableFilter != nil && tableFilter.ShouldIgnoreTable(rawEvent.CurrentSchemaName, info.Name.O) { continue } - ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ - SchemaID: rawEvent.CurrentSchemaID, - TableID: rawEvent.MultipleTableInfos[i].ID, - }) + if isPartitionTable(info) { + for _, partitionID := range getAllPartitionIDs(info) { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: partitionID, + }) + } + } else { + ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ + SchemaID: rawEvent.CurrentSchemaID, + TableID: info.ID, + }) + } addName = append(addName, common.SchemaTableName{ SchemaName: rawEvent.CurrentSchemaName, - TableName: rawEvent.MultipleTableInfos[i].Name.O, + TableName: info.Name.O, }) resultQuerys = append(resultQuerys, querys[i]) } @@ -1692,7 +1742,7 @@ func buildDDLEvent(rawEvent *PersistedDDLEvent, tableFilter filter.Filter) commo TableIDs: rawEvent.PrevPartitions, } } - physicalIDs := getAllPartitionIDs(rawEvent) + physicalIDs := getAllPartitionIDs(rawEvent.TableInfo) newCreatedIDs := getCreatedIDs(rawEvent.PrevPartitions, physicalIDs) for _, id := range newCreatedIDs { ddlEvent.NeedAddedTables = append(ddlEvent.NeedAddedTables, common.Table{ diff --git a/logservice/schemastore/persist_storage_test.go b/logservice/schemastore/persist_storage_test.go index bfed5655c..02bc1ccdf 100644 --- a/logservice/schemastore/persist_storage_test.go +++ b/logservice/schemastore/persist_storage_test.go @@ -38,6 +38,7 @@ func loadPersistentStorageForTest(db *pebble.DB, gcTs uint64, upperBound UpperBo gcTs: gcTs, upperBound: upperBound, tableMap: make(map[int64]*BasicTableInfo), + partitionMap: make(map[int64]BasicPartitionInfo), databaseMap: make(map[int64]*BasicDatabaseInfo), tablesDDLHistory: make(map[int64][]uint64), tableTriggerDDLHistory: make([]uint64, 0), @@ -1630,8 +1631,176 @@ func TestCreateTables(t *testing.T) { } } -func TestRenameTables(t *testing.T) { +func TestCreateTablesForPartitionTable(t *testing.T) { + dbPath := fmt.Sprintf("/tmp/testdb-%s", t.Name()) + err := os.RemoveAll(dbPath) + require.Nil(t, err) + + gcTs := uint64(500) + schemaID := int64(300) + + databaseInfo := make(map[int64]*model.DBInfo) + databaseInfo[schemaID] = &model.DBInfo{ + ID: schemaID, + Name: model.NewCIStr("test"), + } + pStorage := newPersistentStorageForTest(dbPath, gcTs, databaseInfo) + + // create tables + tableID1 := int64(100) + tableID2 := tableID1 + 100 + tableID3 := tableID1 + 200 + partitionID1 := tableID1 + 1000 + partitionID2 := partitionID1 + 100 + partitionID3 := partitionID1 + 200 + partitionID4 := partitionID1 + 300 + partitionID5 := partitionID1 + 400 + partitionID6 := partitionID1 + 500 + { + job := &model.Job{ + Type: model.ActionCreateTables, + SchemaID: schemaID, + Query: "sql1;sql2;sql3", + BinlogInfo: &model.HistoryInfo{ + SchemaVersion: 501, + MultipleTableInfos: []*model.TableInfo{ + { + ID: tableID1, + Name: model.NewCIStr("t1"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID1, + }, + { + ID: partitionID2, + }, + }, + }, + }, + { + ID: tableID2, + Name: model.NewCIStr("t2"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID3, + }, + { + ID: partitionID4, + }, + }, + }, + }, + { + ID: tableID3, + Name: model.NewCIStr("t3"), + Partition: &model.PartitionInfo{ + Definitions: []model.PartitionDefinition{ + { + ID: partitionID5, + }, + { + ID: partitionID6, + }, + }, + }, + }, + }, + FinishedTS: 601, + }, + } + pStorage.handleDDLJob(job) + } + + { + require.Equal(t, 3, len(pStorage.databaseMap[schemaID].Tables)) + require.Equal(t, 3, len(pStorage.tableMap)) + require.Equal(t, 3, len(pStorage.partitionMap)) + } + + { + store := newEmptyVersionedTableInfoStore(partitionID1) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t1", store.infos[0].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(partitionID2) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t1", store.infos[0].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(partitionID3) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t2", store.infos[0].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(partitionID4) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t2", store.infos[0].info.Name.O) + } + + { + store := newEmptyVersionedTableInfoStore(partitionID5) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t3", store.infos[0].info.Name.O) + } + { + store := newEmptyVersionedTableInfoStore(partitionID6) + pStorage.buildVersionedTableInfoStore(store) + require.Equal(t, 1, len(store.infos)) + require.Equal(t, "t3", store.infos[0].info.Name.O) + } + + { + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(nil, 600, 601) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + + verifyTableIsAdded(t, ddlEvents[0], partitionID1, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID2, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID3, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID4, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID5, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID6, schemaID) + } + + // filter t2 and t3 + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test.t1"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(tableFilter, 600, 601) + require.Nil(t, err) + require.Equal(t, 1, len(ddlEvents)) + + verifyTableIsAdded(t, ddlEvents[0], partitionID1, schemaID) + verifyTableIsAdded(t, ddlEvents[0], partitionID2, schemaID) + require.Equal(t, 2, len(ddlEvents[0].NeedAddedTables)) + } + + // all filtered out + { + filterConfig := &config.FilterConfig{ + Rules: []string{"test2.*"}, + } + tableFilter, err := filter.NewFilter(filterConfig, "", false) + require.Nil(t, err) + ddlEvents, err := pStorage.fetchTableTriggerDDLEvents(tableFilter, 600, 601) + require.Nil(t, err) + require.Equal(t, 0, len(ddlEvents)) + } } func TestFetchDDLEventsBasic(t *testing.T) {