Skip to content

Commit

Permalink
schemastore: support more ddls (pingcap#350)
Browse files Browse the repository at this point in the history
* add schema update field and fix rename table

* small fix

* remove unnecessary schema id

* support some ddl for partition table

* add tests for partition table

* add tests

* partition management wip

* wip

* wip

* add more test

* fix exchange partition and add tests

* add log

* add some log

* fix init ddl table info nil pointer

* support create tables
  • Loading branch information
lidezhu authored Oct 7, 2024
1 parent 4703f8b commit 6683440
Show file tree
Hide file tree
Showing 10 changed files with 2,270 additions and 244 deletions.
6 changes: 5 additions & 1 deletion logservice/schemastore/ddl_job_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/mounter"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/pingcap/tiflow/pkg/security"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

type ddlJobFetcher struct {
Expand All @@ -56,7 +58,7 @@ func newDDLJobFetcher(
) *ddlJobFetcher {
clientConfig := &logpuller.SubscriptionClientConfig{
RegionRequestWorkerPerStore: 1,
ChangeEventProcessorNum: 4,
ChangeEventProcessorNum: 1, // must be 1, because ddlJobFetcher.input cannot be called concurrently
AdvanceResolvedTsIntervalInMs: 100,
}
client := logpuller.NewSubscriptionClient(
Expand Down Expand Up @@ -120,8 +122,10 @@ func (p *ddlJobFetcher) unmarshalDDL(rawKV *common.RawKVEntry) (*model.Job, erro
return nil, nil
}
if p.ddlTableInfo == nil && !mounter.IsLegacyFormatJob(rawKV) {
log.Info("begin to init ddl table info")
err := p.initDDLTableInfo()
if err != nil {
log.Error("init ddl table info failed", zap.Error(err))
return nil, errors.Trace(err)
}
}
Expand Down
81 changes: 62 additions & 19 deletions logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,13 @@ func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*Basic
return databaseMap, nil
}

func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int64]*BasicDatabaseInfo) (map[int64]*BasicTableInfo, error) {
func loadTablesInKVSnap(
snap *pebble.Snapshot,
gcTs uint64,
databaseMap map[int64]*BasicDatabaseInfo,
) (map[int64]*BasicTableInfo, map[int64]BasicPartitionInfo, error) {
tablesInKVSnap := make(map[int64]*BasicTableInfo)
partitionsInKVSnap := make(map[int64]BasicPartitionInfo)

startKey, err := tableInfoKey(gcTs, 0)
if err != nil {
Expand All @@ -208,26 +213,32 @@ func loadTablesInKVSnap(snap *pebble.Snapshot, gcTs uint64, databaseMap map[int6
log.Fatal("unmarshal table info entry failed", zap.Error(err))
}

tbNameInfo := model.TableNameInfo{}
if err := json.Unmarshal(table_info_entry.TableInfoValue, &tbNameInfo); err != nil {
log.Fatal("unmarshal table name info failed", zap.Error(err))
tableInfo := model.TableInfo{}
if err := json.Unmarshal(table_info_entry.TableInfoValue, &tableInfo); err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}
databaseInfo, ok := databaseMap[table_info_entry.SchemaID]
if !ok {
log.Panic("database not found",
zap.Int64("schemaID", table_info_entry.SchemaID),
zap.String("schemaName", table_info_entry.SchemaName),
zap.String("tableName", tbNameInfo.Name.O))
zap.String("tableName", tableInfo.Name.O))
}
// TODO: add a unit test for this case
databaseInfo.Tables[tbNameInfo.ID] = true
tablesInKVSnap[tbNameInfo.ID] = &BasicTableInfo{
databaseInfo.Tables[tableInfo.ID] = true
tablesInKVSnap[tableInfo.ID] = &BasicTableInfo{
SchemaID: table_info_entry.SchemaID,
Name: tbNameInfo.Name.O,
Name: tableInfo.Name.O,
}
if tableInfo.Partition != nil {
partitionInfo := make(BasicPartitionInfo)
for _, partition := range tableInfo.Partition.Definitions {
partitionInfo[partition.ID] = nil
}
partitionsInKVSnap[tableInfo.ID] = partitionInfo
}
}

return tablesInKVSnap, nil
return tablesInKVSnap, partitionsInKVSnap, nil
}

// load the ddl jobs in the range (gcTs, upperBound] and apply the ddl job to update database and table info
Expand All @@ -237,6 +248,7 @@ func loadAndApplyDDLHistory(
maxFinishedDDLTs uint64,
databaseMap map[int64]*BasicDatabaseInfo,
tableMap map[int64]*BasicTableInfo,
partitionMap map[int64]BasicPartitionInfo,
) (map[int64][]uint64, []uint64, error) {
tablesDDLHistory := make(map[int64][]uint64)
tableTriggerDDLHistory := make([]uint64, 0)
Expand Down Expand Up @@ -279,7 +291,7 @@ func loadAndApplyDDLHistory(
tableTriggerDDLHistory); err != nil {
log.Panic("updateDDLHistory error", zap.Error(err))
}
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap); err != nil {
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
}
}
Expand Down Expand Up @@ -333,6 +345,17 @@ func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEv
if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}
ddlEvent.TableInfoValue = nil

if len(ddlEvent.MultipleTableInfosValue) > 0 {
ddlEvent.MultipleTableInfos = make([]*model.TableInfo, len(ddlEvent.MultipleTableInfosValue))
for i := range ddlEvent.MultipleTableInfosValue {
if err := json.Unmarshal(ddlEvent.MultipleTableInfosValue[i], &ddlEvent.MultipleTableInfos[i]); err != nil {
log.Fatal("unmarshal multi table info failed", zap.Error(err))
}
}
}
ddlEvent.MultipleTableInfosValue = nil
return ddlEvent
}

Expand All @@ -346,6 +369,15 @@ func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error {
if err != nil {
return err
}
if len(ddlEvent.MultipleTableInfos) > 0 {
ddlEvent.MultipleTableInfosValue = make([][]byte, len(ddlEvent.MultipleTableInfos))
for i := range ddlEvent.MultipleTableInfos {
ddlEvent.MultipleTableInfosValue[i], err = json.Marshal(ddlEvent.MultipleTableInfos[i])
if err != nil {
return err
}
}
}
ddlValue, err := ddlEvent.MarshalMsg(nil)
if err != nil {
return err
Expand Down Expand Up @@ -526,12 +558,13 @@ func loadAllPhysicalTablesAtTs(
return nil, err
}

tableMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap)
tableMap, partitionMap, err := loadTablesInKVSnap(storageSnap, gcTs, databaseMap)
if err != nil {
return nil, err
}
log.Info("after load tables in kv snap",
zap.Int("tableMapLen", len(tableMap)))
zap.Int("tableMapLen", len(tableMap)),
zap.Int("partitionMapLen", len(partitionMap)))

// apply ddl jobs in range (gcTs, snapVersion]
startKey, err := ddlJobKey(gcTs + 1)
Expand Down Expand Up @@ -559,12 +592,13 @@ func loadAllPhysicalTablesAtTs(
if err := json.Unmarshal(ddlEvent.TableInfoValue, &ddlEvent.TableInfo); err != nil {
log.Fatal("unmarshal table info failed", zap.Error(err))
}
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap); err != nil {
if err := updateDatabaseInfoAndTableInfo(&ddlEvent, databaseMap, tableMap, partitionMap); err != nil {
log.Panic("updateDatabaseInfo error", zap.Error(err))
}
}
log.Info("after load tables from ddl",
zap.Int("tableMapLen", len(tableMap)))
zap.Int("tableMapLen", len(tableMap)),
zap.Int("partitionMapLen", len(partitionMap)))
tables := make([]common.Table, 0)
for tableID, tableInfo := range tableMap {
if _, ok := databaseMap[tableInfo.SchemaID]; !ok {
Expand All @@ -577,10 +611,19 @@ func loadAllPhysicalTablesAtTs(
if tableFilter != nil && tableFilter.ShouldIgnoreTable(databaseMap[tableInfo.SchemaID].Name, tableInfo.Name) {
continue
}
tables = append(tables, common.Table{
SchemaID: tableInfo.SchemaID,
TableID: tableID,
})
if partitionInfo, ok := partitionMap[tableID]; ok {
for partitionID := range partitionInfo {
tables = append(tables, common.Table{
SchemaID: tableInfo.SchemaID,
TableID: partitionID,
})
}
} else {
tables = append(tables, common.Table{
SchemaID: tableInfo.SchemaID,
TableID: tableID,
})
}
}
return tables, nil
}
Loading

0 comments on commit 6683440

Please sign in to comment.