Skip to content

Commit

Permalink
fix ddl bugs; support check dispatcher count to test more precisely (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Jan 9, 2025
1 parent 62e12ff commit b3e2a5a
Show file tree
Hide file tree
Showing 27 changed files with 672 additions and 393 deletions.
1 change: 1 addition & 0 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, api.pauseChangefeed)
changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, api.deleteChangefeed)
changefeedGroup.POST("/:changefeed_id/move_table", coordinatorMiddleware, api.moveTable)
changefeedGroup.GET("/:changefeed_id/get_dispatcher_count", coordinatorMiddleware, api.getDispatcherCount)
changefeedGroup.GET("/:changefeed_id/tables", coordinatorMiddleware, api.listTables)

// capture apis
Expand Down
35 changes: 35 additions & 0 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,41 @@ func (h *OpenAPIV2) listTables(c *gin.Context) {
c.JSON(http.StatusOK, infos)
}

// getDispatcherCount returns the count of dispatcher.
// getDispatcherCount is just for inner test use, not public use.
func (h *OpenAPIV2) getDispatcherCount(c *gin.Context) {
changefeedDisplayName := common.NewChangeFeedDisplayName(c.Param(api.APIOpVarChangefeedID), getNamespaceValueWithDefault(c))
if err := model.ValidateChangefeedID(changefeedDisplayName.Name); err != nil {
_ = c.Error(errors.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedDisplayName.Name))
return
}
// get changefeefID first
coordinator, err := h.server.GetCoordinator()
if err != nil {
_ = c.Error(err)
return
}
cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName)
if err != nil {
_ = c.Error(err)
return
}
changefeedID := cfInfo.ChangefeedID

maintainerManager := h.server.GetMaintainerManager()
maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID)

if !ok {
log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String()))
_ = c.Error(apperror.ErrMaintainerNotFounded)
return
}

number := maintainer.GetDispatcherCount()
c.JSON(http.StatusOK, &DispatcherCount{Count: number})
}

func getNamespaceValueWithDefault(c *gin.Context) string {
namespace := c.Query(api.APIOpVarNamespace)
if namespace == "" {
Expand Down
4 changes: 4 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,10 @@ type DebeziumConfig struct {
OutputOldValue bool `json:"output_old_value"`
}

type DispatcherCount struct {
Count int `json:"count"`
}

type NodeTableInfo struct {
NodeID string `json:"node_id"`
TableIDs []int64 `json:"table_ids"`
Expand Down
4 changes: 4 additions & 0 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,3 +835,7 @@ func (m *Maintainer) setWatermark(newWatermark heartbeatpb.Watermark) {
m.watermark.ResolvedTs = newWatermark.ResolvedTs
}
}

func (m *Maintainer) GetDispatcherCount() int {
return len(m.controller.GetAllTasks())
}
17 changes: 10 additions & 7 deletions maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,15 +289,18 @@ func (c *Controller) FinishBootstrap(
delete(workingMap, table.TableID)
}
}
// tables that not included in init table map we get from tikv at checkpoint ts
// that can happen if a table is created after checkpoint ts
// the initial table map only contains real physical tables,
// ddl table is special table id (0), can be included in the bootstrap response message
for tableID, tableMap := range workingMap {
log.Info("found a tables not in initial table map",
// tables that not included in init table map, but we get from different nodes.
// that can happen such as:
// node1 with table trigger event dispatcher, node2 with table1, and both receive drop table1 ddl
// table trigger event dispatcher write the ddl, while node2 not pass it yet
// then node1 restarts.
// node1 will get the startTs = ddl1.ts, and then the table1 will not be included in the initial table map
// so we just ignore the table1 dispatcher.
// here tableID is physical table id
for tableID := range workingMap {
log.Warn("found a tables not in initial table map",
zap.String("changefeed", c.changefeedID.Name()),
zap.Int64("id", tableID))
c.addWorkingSpans(tableMap)
}

// rebuild barrier status
Expand Down
4 changes: 2 additions & 2 deletions maintainer/replica/replication_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (db *ReplicationDB) TryRemoveBySchemaID(schemaID int64) []*SpanReplication
db.lock.Lock()
defer db.lock.Unlock()

var tasks = make([]*SpanReplication, 0)
tasks := make([]*SpanReplication, 0)
for _, task := range db.schemaTasks[schemaID] {
db.removeSpanUnLock(task)
// the task is scheduled
Expand Down Expand Up @@ -144,7 +144,7 @@ func (db *ReplicationDB) GetAllTasks() []*SpanReplication {
db.lock.RLock()
defer db.lock.RUnlock()

var tasks = make([]*SpanReplication, 0, len(db.allTasks))
tasks := make([]*SpanReplication, 0, len(db.allTasks))
for _, task := range db.allTasks {
tasks = append(tasks, task)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/common/context/app_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
EventService = "EventService"
DispatcherDynamicStream = "DispatcherDynamicStream"
MaintainerManager = "MaintainerManager"
DispatcherOrchestrator = "DispatcherOrchestrator"
)

// Put all the global instances here.
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/upstream.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package config

import (
Expand Down
6 changes: 2 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ type server struct {
coordinatorMu sync.Mutex
coordinator tiserver.Coordinator

dispatcherOrchestrator *dispatcherorchestrator.DispatcherOrchestrator

// session keeps alive between the server and etcd
session *concurrency.Session

Expand Down Expand Up @@ -117,13 +115,13 @@ func (c *server) initialize(ctx context.Context) error {
return errors.Trace(err)
}

messageCenter := messaging.NewMessageCenter(ctx, c.info.ID, c.info.Epoch, config.NewDefaultMessageCenterConfig())
appcontext.SetID(c.info.ID.String())
messageCenter := messaging.NewMessageCenter(ctx, c.info.ID, c.info.Epoch, config.NewDefaultMessageCenterConfig())
appcontext.SetService(appcontext.MessageCenter, messageCenter)

appcontext.SetService(appcontext.EventCollector, eventcollector.New(ctx, c.info.ID))
appcontext.SetService(appcontext.HeartbeatCollector, dispatchermanager.NewHeartBeatCollector(c.info.ID))
c.dispatcherOrchestrator = dispatcherorchestrator.New()
appcontext.SetService(appcontext.DispatcherOrchestrator, dispatcherorchestrator.New())

nodeManager := watcher.NewNodeManager(c.session, c.EtcdClient)
nodeManager.RegisterNodeChangeHandler(
Expand Down
18 changes: 18 additions & 0 deletions tests/integration_tests/_utils/check_coordinator_and_maintainer
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash
# parameter 1: ip addr with port
# parameter 2: changefeed id
# parameter 3: retry count

# this script is used to check the coordinator and maintainer is all existed in the target node.
# we use query_dispatcher_count to check when we query, there is no maintainer is not found error.
# which means the maintainer and the coordinator exists in the node.

set -ex

ipAddr=${1}
changefeedID=${2}
retryCount=${3}

echo "check coordinator and maintainer"

query_dispatcher_count ${ipAddr} ${changefeedID} -1 ${retryCount}
29 changes: 29 additions & 0 deletions tests/integration_tests/_utils/get_table_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash

# the script is used to get tableid based on the table name

# parameter 1: db name
# parameter 2: table name
# parameter 3: retry count

set -ex

dbName=${1}
tableName=${2}

MAX_RETRIES=5
retries=0

while [ $retries -lt $MAX_RETRIES ]; do
id=$(curl http://127.0.0.1:10080/schema/"${dbName}"/"${tableName}" | jq .id)
if [ -n "$id" ]; then
echo $id
exit 0
fi
retries=$((retries+1))
sleep 1
done


echo "Failed to get table id"
exit 1
38 changes: 38 additions & 0 deletions tests/integration_tests/_utils/query_dispatcher_count
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash
# parameter 1: ip addr with port
# parameter 2: changefeed id
# parameter 3: target count, if target count is -1, means the target count is not important, just not to be null
# parameter 4: retry count

set -ex

ipAddr=${1}
changefeedID=${2}
target=${3}
retryCount=${4}

echo "query dispatcher count"
count=0

while [[ $count -lt $retryCount ]]; do
ans=$(curl -X GET http://"${ipAddr}"/api/v2/changefeeds/"${changefeedID}"/get_dispatcher_count)
echo $ans
value=$(echo $ans | jq -r '.count')

if [ "$target" == "-1" ]; then
if [ "$value" != "null" ]; then
exit 0
fi
else
if [ "$value" == "$target" ]; then
exit 0
fi
fi

sleep 2

count=$((count + 1))
done

echo "query dispatcher count $retryCount retries, final value: $value, target: $target"
exit 1
Loading

0 comments on commit b3e2a5a

Please sign in to comment.