Skip to content

Commit

Permalink
schemastore: support create tables for partition table (pingcap#351)
Browse files Browse the repository at this point in the history
* initialize partition map

* add some log

* fix

* small refactor

* small fix

* support createt tables for partition table
  • Loading branch information
lidezhu authored Oct 8, 2024
1 parent 6683440 commit 579ea47
Show file tree
Hide file tree
Showing 4 changed files with 335 additions and 113 deletions.
47 changes: 19 additions & 28 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,7 @@ func loadAndApplyDDLHistory(
}
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
var ddlEvent PersistedDDLEvent
if _, err = ddlEvent.UnmarshalMsg(snapIter.Value()); err != nil {
log.Fatal("unmarshal ddl job failed", zap.Error(err))
}
ddlEvent.TableInfo = &model.TableInfo{}
if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}

ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value())
if shouldSkipDDL(&ddlEvent, databaseMap, tableMap) {
continue
}
Expand Down Expand Up @@ -326,18 +318,9 @@ func readTableInfoInKVSnap(snap *pebble.Snapshot, tableID int64, version uint64)
return common.WrapTableInfo(table_info_entry.SchemaID, table_info_entry.SchemaName, version, tableInfo)
}

func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEvent {
ddlKey, err := ddlJobKey(version)
if err != nil {
log.Fatal("generate ddl job key failed", zap.Error(err))
}
ddlValue, closer, err := snap.Get(ddlKey)
if err != nil {
log.Fatal("get ddl job failed", zap.Error(err))
}
defer closer.Close()
func unmarshalPersistedDDLEvent(value []byte) PersistedDDLEvent {
var ddlEvent PersistedDDLEvent
if _, err := ddlEvent.UnmarshalMsg(ddlValue); err != nil {
if _, err := ddlEvent.UnmarshalMsg(value); err != nil {
log.Fatal("unmarshal ddl job failed", zap.Error(err))
}

Expand All @@ -359,6 +342,19 @@ func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEv
return ddlEvent
}

func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEvent {
ddlKey, err := ddlJobKey(version)
if err != nil {
log.Fatal("generate ddl job key failed", zap.Error(err))
}
ddlValue, closer, err := snap.Get(ddlKey)
if err != nil {
log.Fatal("get ddl job failed", zap.Error(err))
}
defer closer.Close()
return unmarshalPersistedDDLEvent(ddlValue)
}

func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error {
batch := db.NewBatch()
ddlKey, err := ddlJobKey(ddlEvent.FinishedTs)
Expand Down Expand Up @@ -584,14 +580,7 @@ func loadAllPhysicalTablesAtTs(
}
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
var ddlEvent PersistedDDLEvent
if _, err = ddlEvent.UnmarshalMsg(snapIter.Value()); err != nil {
log.Fatal("unmarshal ddl job failed", zap.Error(err))
}
ddlEvent.TableInfo = &model.TableInfo{}
if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}
ddlEvent := unmarshalPersistedDDLEvent(snapIter.Value())
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
}
Expand Down Expand Up @@ -625,5 +614,7 @@ func loadAllPhysicalTablesAtTs(
})
}
}
log.Info("loadAllPhysicalTablesAtTs",
zap.Int("tableLen", len(tables)))
return tables, nil
}
34 changes: 23 additions & 11 deletions logservice/schemastore/multi_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionTruncateTable:
if isPartitionTableEvent(event) {
if isPartitionTable(event.TableInfo) {
createTable := false
for _, partition := range getAllPartitionIDs(event) {
for _, partition := range getAllPartitionIDs(event.TableInfo) {
if v.tableID == partition {
createTable = true
break
Expand All @@ -271,23 +271,23 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
assertNonEmpty(v.infos, event)
appendTableInfo()
case model.ActionAddTablePartition:
newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event))
newCreatedIDs := getCreatedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range newCreatedIDs {
if v.tableID == partition {
appendTableInfo()
break
}
}
case model.ActionDropTablePartition:
droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event))
droppedIDs := getDroppedIDs(event.PrevPartitions, getAllPartitionIDs(event.TableInfo))
for _, partition := range droppedIDs {
if v.tableID == partition {
v.deleteVersion = uint64(event.FinishedTs)
break
}
}
case model.ActionTruncateTablePartition:
physicalIDs := getAllPartitionIDs(event)
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
dropped := false
for _, partition := range droppedIDs {
Expand Down Expand Up @@ -324,15 +324,27 @@ func (v *versionedTableInfoStore) doApplyDDL(event *PersistedDDLEvent) {
case model.ActionCreateTables:
assertEmpty(v.infos, event)
for _, tableInfo := range event.MultipleTableInfos {
if v.tableID == tableInfo.ID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
if isPartitionTable(tableInfo) {
for _, partitionID := range getAllPartitionIDs(tableInfo) {
if v.tableID == partitionID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}
} else {
if v.tableID == tableInfo.ID {
info := common.WrapTableInfo(event.CurrentSchemaID, event.CurrentSchemaName, event.FinishedTs, tableInfo)
info.InitPreSQLs()
v.infos = append(v.infos, &tableInfoItem{version: uint64(event.FinishedTs), info: info})
break
}
}

}
case model.ActionReorganizePartition:
physicalIDs := getAllPartitionIDs(event)
physicalIDs := getAllPartitionIDs(event.TableInfo)
droppedIDs := getDroppedIDs(event.PrevPartitions, physicalIDs)
dropped := false
for _, partition := range droppedIDs {
Expand Down
Loading

0 comments on commit 579ea47

Please sign in to comment.