diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index ab217af0..4c8d2de4 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -334,17 +334,23 @@ jobs: run: | export TICDC_NEWARCH=true && make integration_test CASE=force_replicate_table - # the 12th case in this group - name: Test tidb_mysql_test if: ${{ success() }} run: | export TICDC_NEWARCH=true && make integration_test CASE=tidb_mysql_test - - name: Test resolve_lock + + - name: Test resolve_lock if: ${{ success() }} run: | export TICDC_NEWARCH=true && make integration_test CASE=resolve_lock + # The 14th case in this group + - name: Test move_table + if: ${{ success() }} + run: | + export TICDC_NEWARCH=true && make integration_test CASE=move_table + - name: Upload test logs if: always() uses: ./.github/actions/upload-test-logs diff --git a/api/v2/api.go b/api/v2/api.go index 6296e14a..a2207b8b 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -41,7 +41,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { // For compatibility with the old API. // TiDB Operator relies on this API to determine whether the TiCDC node is healthy. router.GET("/status", api.serverStatus) - // Intergration test relies on this API to determine whether the TiCDC node is healthy. + // Integration test relies on this API to determine whether the TiCDC node is healthy. router.GET("/debug/info", gin.WrapF(api.handleDebugInfo)) coordinatorMiddleware := middleware.ForwardToCoordinatorMiddleware(api.server) @@ -57,9 +57,11 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) { changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, authenticateMiddleware, api.resumeChangefeed) changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, authenticateMiddleware, api.pauseChangefeed) changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, authenticateMiddleware, api.deleteChangefeed) - changefeedGroup.POST("/:changefeed_id/move_table", coordinatorMiddleware, authenticateMiddleware, api.moveTable) - changefeedGroup.GET("/:changefeed_id/get_dispatcher_count", coordinatorMiddleware, api.getDispatcherCount) - changefeedGroup.GET("/:changefeed_id/tables", coordinatorMiddleware, api.listTables) + + // internal APIs + changefeedGroup.POST("/:changefeed_id/move_table", authenticateMiddleware, api.moveTable) + changefeedGroup.GET("/:changefeed_id/get_dispatcher_count", api.getDispatcherCount) + changefeedGroup.GET("/:changefeed_id/tables", api.listTables) // capture apis captureGroup := v2.Group("/captures") diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 8c340166..90997a82 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -24,6 +24,7 @@ import ( "github.com/gin-gonic/gin" "github.com/pingcap/log" + "github.com/pingcap/ticdc/api/middleware" "github.com/pingcap/ticdc/downstreamadapter/sink" apperror "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/common" @@ -200,9 +201,13 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { log.Info("Create changefeed successfully!", zap.String("id", info.ChangefeedID.Name()), zap.String("changefeed", info.String())) - c.JSON(http.StatusOK, toAPIModel(info, - info.StartTs, info.StartTs, - nil)) + c.JSON(http.StatusOK, toAPIModel( + info, + &config.ChangeFeedStatus{ + CheckpointTs: info.StartTs, + }, + nil, + )) } // listChangeFeeds lists all changefeeds in cdc cluster @@ -291,15 +296,13 @@ func (h *OpenAPIV2) getChangeFeed(c *gin.Context) { } taskStatus := make([]model.CaptureTaskStatus, 0) - detail := toAPIModel(cfInfo, status.CheckpointTs, - status.CheckpointTs, taskStatus) + detail := toAPIModel(cfInfo, status, taskStatus) c.JSON(http.StatusOK, detail) } func toAPIModel( info *config.ChangeFeedInfo, - resolvedTs uint64, - checkpointTs uint64, + status *config.ChangeFeedStatus, taskStatus []model.CaptureTaskStatus, ) *ChangeFeedInfo { var runningError *RunningError @@ -332,10 +335,12 @@ func toAPIModel( State: info.State, Error: runningError, CreatorVersion: info.CreatorVersion, - CheckpointTs: checkpointTs, - ResolvedTs: resolvedTs, - CheckpointTime: model.JSONTime(oracle.GetTimeFromTS(checkpointTs)), + CheckpointTs: status.CheckpointTs, + ResolvedTs: status.CheckpointTs, + CheckpointTime: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)), TaskStatus: taskStatus, + MaintainerAddr: status.GetMaintainerAddr(), + GID: info.ChangefeedID.ID(), } return apiInfoModel } @@ -618,7 +623,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { return } - c.JSON(http.StatusOK, toAPIModel(oldCfInfo, status.CheckpointTs, status.CheckpointTs, nil)) + c.JSON(http.StatusOK, toAPIModel(oldCfInfo, status, nil)) } // verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed @@ -688,24 +693,42 @@ func (h *OpenAPIV2) moveTable(c *gin.Context) { changefeedDisplayName.Name)) return } + // get changefeedID first - coordinator, err := h.server.GetCoordinator() + cfInfo, err := getChangeFeed(c.Request.Host, changefeedDisplayName.Name) if err != nil { _ = c.Error(err) return } - cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName) + + if cfInfo.MaintainerAddr == "" { + _ = c.Error(errors.New("Can't not find maintainer for changefeed: " + changefeedDisplayName.Name)) + return + } + + selfInfo, err := h.server.SelfInfo() if err != nil { _ = c.Error(err) return } - changefeedID := cfInfo.ChangefeedID + + if cfInfo.MaintainerAddr != selfInfo.AdvertiseAddr { + // Forward the request to the maintainer + middleware.ForwardToServer(c, selfInfo.ID, cfInfo.MaintainerAddr) + c.Abort() + return + } + + changefeedID := common.ChangeFeedID{ + Id: cfInfo.GID, + DisplayName: common.NewChangeFeedDisplayName(cfInfo.ID, cfInfo.Namespace), + } maintainerManager := h.server.GetMaintainerManager() maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID) if !ok { - log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String())) + log.Error("maintainer not found for changefeed in this node", zap.String("GID", changefeedID.Id.String()), zap.String("Name", changefeedID.DisplayName.String())) _ = c.Error(apperror.ErrMaintainerNotFounded) return } @@ -732,24 +755,40 @@ func (h *OpenAPIV2) listTables(c *gin.Context) { return } - coordinator, err := h.server.GetCoordinator() + // get changefeedID first + cfInfo, err := getChangeFeed(c.Request.Host, changefeedDisplayName.Name) if err != nil { _ = c.Error(err) return } - cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName) + if cfInfo.MaintainerAddr == "" { + _ = c.Error(errors.New("Can't not find maintainer for changefeed: " + changefeedDisplayName.Name)) + return + } + + selfInfo, err := h.server.SelfInfo() if err != nil { _ = c.Error(err) return } - changefeedID := cfInfo.ChangefeedID + if cfInfo.MaintainerAddr != selfInfo.AdvertiseAddr { + // Forward the request to the maintainer + middleware.ForwardToServer(c, selfInfo.ID, cfInfo.MaintainerAddr) + c.Abort() + return + } + + changefeedID := common.ChangeFeedID{ + Id: cfInfo.GID, + DisplayName: common.NewChangeFeedDisplayName(cfInfo.ID, cfInfo.Namespace), + } maintainerManager := h.server.GetMaintainerManager() maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID) if !ok { - log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String())) + log.Error("maintainer not found for changefeed in this node", zap.String("GID", changefeedID.Id.String()), zap.String("Name", changefeedID.DisplayName.String())) _ = c.Error(apperror.ErrMaintainerNotFounded) return } @@ -768,11 +807,16 @@ func (h *OpenAPIV2) listTables(c *gin.Context) { nodeTableInfo.addTableID(table.Span.TableID) } - infos := make([]*NodeTableInfo, 0, len(nodeTableInfoMap)) + infos := make([]NodeTableInfo, 0, len(nodeTableInfoMap)) for _, nodeTableInfo := range nodeTableInfoMap { - infos = append(infos, nodeTableInfo) + infos = append(infos, *nodeTableInfo) + } + + resp := &ListResponse[NodeTableInfo]{ + Total: len(infos), + Items: infos, } - c.JSON(http.StatusOK, infos) + c.JSON(http.StatusOK, resp) } // getDispatcherCount returns the count of dispatcher. @@ -784,24 +828,41 @@ func (h *OpenAPIV2) getDispatcherCount(c *gin.Context) { changefeedDisplayName.Name)) return } - // get changefeefID first - coordinator, err := h.server.GetCoordinator() + + cfInfo, err := getChangeFeed(c.Request.Host, changefeedDisplayName.Name) if err != nil { _ = c.Error(err) return } - cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName) + + if cfInfo.MaintainerAddr == "" { + _ = c.Error(errors.New("Can't not find maintainer for changefeed: " + changefeedDisplayName.Name)) + return + } + + selfInfo, err := h.server.SelfInfo() if err != nil { _ = c.Error(err) return } - changefeedID := cfInfo.ChangefeedID + + if cfInfo.MaintainerAddr != selfInfo.AdvertiseAddr { + // Forward the request to the maintainer + middleware.ForwardToServer(c, selfInfo.ID, cfInfo.MaintainerAddr) + c.Abort() + return + } + + changefeedID := common.ChangeFeedID{ + Id: cfInfo.GID, + DisplayName: common.NewChangeFeedDisplayName(cfInfo.ID, cfInfo.Namespace), + } maintainerManager := h.server.GetMaintainerManager() maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID) if !ok { - log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String())) + log.Error("maintainer not found for changefeed in this node", zap.String("GID", changefeedID.Id.String()), zap.String("changefeed", changefeedID.String())) _ = c.Error(apperror.ErrMaintainerNotFounded) return } diff --git a/api/v2/helper.go b/api/v2/helper.go new file mode 100644 index 00000000..5017a362 --- /dev/null +++ b/api/v2/helper.go @@ -0,0 +1,81 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/pingcap/log" + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/tiflow/pkg/httputil" + "go.uber.org/zap" +) + +func getChangeFeed(host, cfName string) (ChangeFeedInfo, error) { + security := config.GetGlobalServerConfig().Security + + uri := fmt.Sprintf("/api/v2/changefeeds/%s", cfName) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext( + ctx, "GET", uri, nil) + if err != nil { + return ChangeFeedInfo{}, err + } + req.URL.Host = host + + if tls, _ := security.ToTLSConfigWithVerify(); tls != nil { + req.URL.Scheme = "https" + } else { + req.URL.Scheme = "http" + } + + client, err := httputil.NewClient(security) + if err != nil { + return ChangeFeedInfo{}, err + } + + log.Info("Send request to coordinator to get changefeed info", + zap.String("host", host), + zap.String("uri", uri), + zap.String("schema", req.URL.Scheme), + ) + + resp, err := client.Do(req) + if err != nil { + log.Error("failed to get changefeed", zap.Error(err), zap.String("uri", uri)) + return ChangeFeedInfo{}, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error("failed to read changefeed response", zap.Error(err), zap.String("uri", uri)) + return ChangeFeedInfo{}, err + } + + var cfInfo ChangeFeedInfo + if err := json.Unmarshal(body, &cfInfo); err != nil { + log.Error("failed to unmarshal changefeed response", zap.Error(err), zap.String("uri", uri)) + return ChangeFeedInfo{}, err + } + + return cfInfo, nil +} diff --git a/api/v2/model.go b/api/v2/model.go index 90c7fb03..7625627e 100644 --- a/api/v2/model.go +++ b/api/v2/model.go @@ -17,6 +17,7 @@ import ( "encoding/json" "time" + "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/tiflow/cdc/model" @@ -1086,6 +1087,9 @@ type ChangeFeedInfo struct { CheckpointTs uint64 `json:"checkpoint_ts"` CheckpointTime model.JSONTime `json:"checkpoint_time"` TaskStatus []model.CaptureTaskStatus `json:"task_status,omitempty"` + + GID common.GID `json:"gid"` + MaintainerAddr string `json:"maintainer_addr,omitempty"` } // SyncedStatus describes the detail of a changefeed's synced status diff --git a/coordinator/controller.go b/coordinator/controller.go index 2e33e832..a7de76e2 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -502,7 +502,14 @@ func (c *Controller) ListChangefeeds(_ context.Context) ([]*config.ChangeFeedInf return infos, statuses, nil } -func (c *Controller) GetChangefeed(_ context.Context, changefeedDisplayName common.ChangeFeedDisplayName) (*config.ChangeFeedInfo, *config.ChangeFeedStatus, error) { +func (c *Controller) GetChangefeed( + _ context.Context, + changefeedDisplayName common.ChangeFeedDisplayName, +) ( + *config.ChangeFeedInfo, + *config.ChangeFeedStatus, + error, +) { c.apiLock.RLock() defer c.apiLock.RUnlock() @@ -510,7 +517,16 @@ func (c *Controller) GetChangefeed(_ context.Context, changefeedDisplayName comm if cf == nil { return nil, nil, errors.ErrChangeFeedNotExists.GenWithStackByArgs(changefeedDisplayName.Name) } - return cf.GetInfo(), &config.ChangeFeedStatus{CheckpointTs: cf.GetStatus().CheckpointTs}, nil + + maintainerID := cf.GetNodeID() + nodeInfo := c.nodeManager.GetNodeInfo(maintainerID) + maintainerAddr := "" + if nodeInfo != nil { + maintainerAddr = nodeInfo.AdvertiseAddr + } + status := &config.ChangeFeedStatus{CheckpointTs: cf.GetStatus().CheckpointTs} + status.SetMaintainerAddr(maintainerAddr) + return cf.GetInfo(), status, nil } // GetTask queries a task by channgefeed ID, return nil if not found diff --git a/coordinator/controller_test.go b/coordinator/controller_test.go index 367fe97f..026f50f8 100644 --- a/coordinator/controller_test.go +++ b/coordinator/controller_test.go @@ -19,7 +19,7 @@ import ( "github.com/golang/mock/gomock" "github.com/pingcap/ticdc/coordinator/changefeed" - "github.com/pingcap/ticdc/coordinator/changefeed/mock" + mock_changefeed "github.com/pingcap/ticdc/coordinator/changefeed/mock" "github.com/pingcap/ticdc/coordinator/operator" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" @@ -137,9 +137,11 @@ func TestGetChangefeed(t *testing.T) { ctrl := gomock.NewController(t) backend := mock_changefeed.NewMockBackend(ctrl) changefeedDB := changefeed.NewChangefeedDB(1216) + nodeManager := watcher.NewNodeManager(nil, nil) controller := &Controller{ backend: backend, changefeedDB: changefeedDB, + nodeManager: nodeManager, } cfID := common.NewChangeFeedIDWithName("test") cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 4ae2b652..92b0a523 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -449,7 +449,7 @@ func (c *Controller) moveTable(tableId int64, targetNode node.ID) error { count := 0 maxTry := 30 for !op.IsFinished() && count < maxTry { - time.Sleep(500 * time.Millisecond) + time.Sleep(1 * time.Second) count += 1 log.Info("wait for move table operator finished", zap.Int("count", count)) } diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 1dba3cbe..67fac81e 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -358,6 +358,14 @@ func (m *Manager) dispatcherMaintainerMessage( func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) (*Maintainer, bool) { c, ok := m.maintainers.Load(changefeedID) + + m.maintainers.Range(func(key, value interface{}) bool { + log.Info("fizz maintainer", + zap.String("GID", key.(common.ChangeFeedID).Id.String()), + zap.String("Name", key.(common.ChangeFeedID).DisplayName.String())) + return true + }) + if !ok { return nil, false } diff --git a/pkg/config/changefeed.go b/pkg/config/changefeed.go index 1f41a7ef..e65a713f 100644 --- a/pkg/config/changefeed.go +++ b/pkg/config/changefeed.go @@ -484,6 +484,10 @@ type ChangeFeedStatus struct { CheckpointTs uint64 `json:"checkpoint-ts"` // Progress indicates changefeed progress status Progress Progress `json:"progress"` + + // MaintainerAddr is the address of the changefeed's maintainer + // It is used to identify the changefeed's maintainer, and it is not stored in etcd. + maintainerAddr string `json:"-"` } // Marshal returns json encoded string of ChangeFeedStatus, only contains necessary fields stored in storage @@ -492,9 +496,19 @@ func (status *ChangeFeedStatus) Marshal() (string, error) { return string(data), cerror.WrapError(cerror.ErrMarshalFailed, err) } -// Unmarshal unmarshals into *ChangeFeedStatus from json marshal byte slice +// Unmarshal into *ChangeFeedStatus from json marshal byte slice func (status *ChangeFeedStatus) Unmarshal(data []byte) error { err := json.Unmarshal(data, status) return errors.Annotatef( cerror.WrapError(cerror.ErrUnmarshalFailed, err), "Unmarshal data: %v", data) } + +// GetMaintainerAddr returns the address of the changefeed's maintainer +func (status *ChangeFeedStatus) GetMaintainerAddr() string { + return status.maintainerAddr +} + +// SetMaintainerAddr sets the address of the changefeed's maintainer +func (status *ChangeFeedStatus) SetMaintainerAddr(addr string) { + status.maintainerAddr = addr +} diff --git a/server/watcher/module_node_manager.go b/server/watcher/module_node_manager.go index 75640682..7f072dae 100644 --- a/server/watcher/module_node_manager.go +++ b/server/watcher/module_node_manager.go @@ -146,6 +146,10 @@ func (c *NodeManager) GetAliveNodes() map[node.ID]*node.Info { return *c.nodes.Load() } +func (c *NodeManager) GetNodeInfo(id node.ID) *node.Info { + return (*c.nodes.Load())[id] +} + func (c *NodeManager) Run(ctx context.Context) error { cfg := config.GetGlobalServerConfig() watcher := NewEtcdWatcher(c.etcdClient, diff --git a/tests/integration_tests/move_table/main.go b/tests/integration_tests/move_table/main.go index 3352d663..0741b6e1 100644 --- a/tests/integration_tests/move_table/main.go +++ b/tests/integration_tests/move_table/main.go @@ -11,11 +11,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -// This is a program that drives the CDC cluster to move a table +// Description: +// The program tests whether TiCDC can successfully move all tables replicated +// by one capture node to another capture node, ensuring the original capture +// becomes empty after the movement. + package main import ( - "bytes" "context" "encoding/json" "flag" @@ -26,29 +29,20 @@ import ( "time" "github.com/pingcap/log" + v2 "github.com/pingcap/ticdc/api/v2" + clientv2 "github.com/pingcap/ticdc/pkg/api/v2" "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/etcd" - "github.com/pingcap/ticdc/pkg/retry" - "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/security" - "go.etcd.io/etcd/client/pkg/v3/logutil" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" ) var ( - pd = flag.String("pd", "http://127.0.0.1:2379", "PD address and port") + cdcAddr = flag.String("cdc-addr", "127.0.0.1:8300", "CDC API address and port") logLevel = flag.String("log-level", "debug", "Set log level of the logger") ) -const ( - maxCheckSourceEmptyRetries = 30 -) - // This program moves all tables replicated by a certain capture to other captures, // and makes sure that the original capture becomes empty. func main() { @@ -57,204 +51,143 @@ func main() { log.SetLevel(zapcore.DebugLevel) } - log.Info("table mover started") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - defer cancel() + log.Info("move table test starting...") - cluster, err := newCluster(ctx, *pd) + cluster, err := newCluster() if err != nil { log.Fatal("failed to create cluster info", zap.Error(err)) } - err = retry.Do(ctx, func() error { - err := cluster.refreshInfo(ctx) - if err != nil { - log.Warn("error refreshing cluster info", zap.Error(err)) - } - - log.Info("task status", zap.Reflect("status", cluster.captures)) - - if len(cluster.captures) <= 1 { - return errors.New("too few captures") - } - return nil - }, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(20), retry.WithIsRetryableErr(errors.IsRetryableError)) - if err != nil { - log.Fatal("Fail to get captures", zap.Error(err)) - } - - var sourceCapture string - - for capture, tables := range cluster.captures { - if len(tables) == 0 { - continue - } - sourceCapture = capture - break - } - - var targetCapture string - - for candidateCapture := range cluster.captures { - if candidateCapture != sourceCapture { - targetCapture = candidateCapture - } - } - if targetCapture == "" { - log.Fatal("no target, unexpected") - } + sourceNode, targetNode := getSourceAndTargetNode(cluster) - err = cluster.moveAllTables(ctx, sourceCapture, targetCapture) + err = cluster.moveAllTables(sourceNode, targetNode) if err != nil { log.Fatal("failed to move tables", zap.Error(err)) } - log.Info("all tables are moved", zap.String("sourceCapture", sourceCapture), zap.String("targetCapture", targetCapture)) + log.Info("all tables are moved", zap.String("sourceNode", sourceNode), zap.String("targetNode", targetNode)) + + cluster.checkSourceEmpty(sourceNode) } type tableInfo struct { - ID int64 - Changefeed string + changefeedNameSpace string + changefeedName string + + // table id + id int64 } +// cluster is a struct that contains the ticdc cluster's +// api client and the table info of each node +// it is used to move all tables from source node to target node type cluster struct { - ownerAddr string - captures map[string][]*tableInfo - cdcEtcdCli *etcd.CDCEtcdClientImpl + client clientv2.APIV2Interface + servers map[string][]tableInfo } -func newCluster(ctx context.Context, pd string) (*cluster, error) { - logConfig := logutil.DefaultZapLoggerConfig - logConfig.Level = zap.NewAtomicLevelAt(zapcore.ErrorLevel) - - etcdCli, err := clientv3.New(clientv3.Config{ - Endpoints: []string{pd}, - TLS: nil, - Context: ctx, - LogConfig: &logConfig, - DialTimeout: 5 * time.Second, - DialOptions: []grpc.DialOption{ - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: time.Second, - Multiplier: 1.1, - Jitter: 0.1, - MaxDelay: 3 * time.Second, - }, - MinConnectTimeout: 3 * time.Second, - }), - }, - }) - if err != nil { - return nil, errors.Trace(err) - } +func newCluster() (*cluster, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - cdcEtcdCli, err := etcd.NewCDCEtcdClient(ctx, etcdCli, etcd.DefaultCDCClusterID) + client, err := clientv2.NewAPIClient(*cdcAddr, nil, nil) if err != nil { return nil, errors.Trace(err) } - ret := &cluster{ - ownerAddr: "", - captures: nil, - cdcEtcdCli: cdcEtcdCli, - } - log.Info("new cluster initialized") - - return ret, nil -} - -func (c *cluster) moveAllTables(ctx context.Context, sourceCapture, targetCapture string) error { - // move all tables to another capture - for _, table := range c.captures[sourceCapture] { - err := moveTable(ctx, c.ownerAddr, table.Changefeed, targetCapture, table.ID) + var servers []v2.Capture + for { + servers, err = client.Captures().List(ctx) if err != nil { - log.Warn("failed to move table", zap.Error(err)) - continue + return nil, errors.Trace(err) } - - log.Info("moved table successful", zap.Int64("tableID", table.ID)) + if len(servers) > 1 { + break + } + log.Info("waiting for servers to be ready", zap.Int("serverNum", len(servers))) + time.Sleep(1 * time.Second) } - return nil -} + serversMap := make(map[string][]tableInfo) + for _, server := range servers { + serversMap[server.ID] = make([]tableInfo, 0) + } -func (c *cluster) refreshInfo(ctx context.Context) error { - ownerID, err := c.cdcEtcdCli.GetOwnerID(ctx) + changefeeds, err := client.Changefeeds().List(ctx, "default", "all") if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } - log.Debug("retrieved owner ID", zap.String("ownerID", ownerID)) + for _, changefeed := range changefeeds { + nodeTableInfos, err := listTables(*cdcAddr, changefeed.ID) + if err != nil { + return nil, errors.Trace(err) + } - captureInfo, err := c.cdcEtcdCli.GetCaptureInfo(ctx, ownerID) - if err != nil { - return errors.Trace(err) - } + log.Info("list tables", zap.Any("nodeTableInfos", nodeTableInfos)) - log.Debug("retrieved owner addr", zap.String("ownerAddr", captureInfo.AdvertiseAddr)) - c.ownerAddr = captureInfo.AdvertiseAddr + for _, nodeTableInfo := range nodeTableInfos { + for _, tableID := range nodeTableInfo.TableIDs { + serversMap[nodeTableInfo.NodeID] = append(serversMap[nodeTableInfo.NodeID], tableInfo{ + changefeedNameSpace: changefeed.Namespace, + changefeedName: changefeed.ID, + id: tableID, + }) + } + } - _, changefeeds, err := c.cdcEtcdCli.GetChangeFeeds(ctx) - if err != nil { - return errors.Trace(err) - } - if len(changefeeds) == 0 { - return errors.New("No changefeed") } - log.Debug("retrieved changefeeds", zap.Reflect("changefeeds", changefeeds)) - var changefeed string - for k := range changefeeds { - changefeed = k.ID - break + ret := &cluster{ + client: client, + servers: serversMap, } - c.captures = make(map[string][]*tableInfo) - _, captures, err := c.cdcEtcdCli.GetCaptures(ctx) - if err != nil { - return errors.Trace(err) - } - for _, capture := range captures { - c.captures[capture.ID] = make([]*tableInfo, 0) - processorDetails, err := queryProcessor(c.ownerAddr, changefeed, capture.ID) + log.Info("new cluster initialized") + + return ret, nil +} + +// moveAllTables moves all tables from source node to target node +func (c *cluster) moveAllTables(sourceNode, targetNode string) error { + for _, table := range c.servers[sourceNode] { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err := c. + client. + Changefeeds(). + MoveTable(ctx, table.changefeedNameSpace, table.changefeedName, table.id, targetNode) + + log.Info("move table", + zap.String("sourceNode", sourceNode), + zap.String("targetNode", targetNode), + zap.Int64("tableID", table.id), + zap.Error(err)) + if err != nil { + log.Error("failed to move table", zap.Error(err)) return errors.Trace(err) } - - log.Debug("retrieved processor details", - zap.String("changefeed", changefeed), - zap.String("captureID", capture.ID), - zap.Any("processorDetail", processorDetails)) - for _, tableID := range processorDetails.Tables { - c.captures[capture.ID] = append(c.captures[capture.ID], &tableInfo{ - ID: tableID, - Changefeed: changefeed, - }) - } } return nil } -// queryProcessor invokes the following API to get the mapping from +// listTables invokes the following API to get the mapping from // captureIDs to tableIDs: // -// GET /api/v1/processors/{changefeed_id}/{capture_id} -func queryProcessor( - apiEndpoint string, - changefeed string, - captureID string, -) (*model.ProcessorDetail, error) { +// GET /api/v2/changefeed/{changefeed_id}/tables +func listTables( + host string, + changefeedID string, +) ([]v2.NodeTableInfo, error) { httpClient, err := httputil.NewClient(&security.Credential{ /* no TLS */ }) if err != nil { return nil, errors.Trace(err) } - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - requestURL := fmt.Sprintf("http://%s/api/v1/processors/%s/%s", apiEndpoint, changefeed, captureID) + requestURL := fmt.Sprintf("http://%s/api/v2/changefeeds/%s/tables", host, changefeedID) resp, err := httpClient.Get(ctx, requestURL) if err != nil { return nil, errors.Trace(err) @@ -262,7 +195,8 @@ func queryProcessor( defer func() { _ = resp.Body.Close() }() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { + + if resp.StatusCode != http.StatusOK { return nil, errors.Trace( errors.Errorf("HTTP API returned error status: %d, url: %s", resp.StatusCode, requestURL)) } @@ -272,38 +206,64 @@ func queryProcessor( return nil, errors.Trace(err) } - var ret model.ProcessorDetail + var ret v2.ListResponse[v2.NodeTableInfo] err = json.Unmarshal(bodyBytes, &ret) if err != nil { return nil, errors.Trace(err) } - return &ret, nil + return ret.Items, nil } -func moveTable(ctx context.Context, ownerAddr string, changefeed string, target string, tableID int64) error { - formStr := fmt.Sprintf("cf-id=%s&target-cp-id=%s&table-id=%d", changefeed, target, tableID) - log.Debug("preparing HTTP API call to owner", zap.String("formStr", formStr)) - rd := bytes.NewReader([]byte(formStr)) - req, err := http.NewRequestWithContext(ctx, "POST", "http://"+ownerAddr+"/capture/owner/move_table", rd) +func (c *cluster) checkSourceEmpty(sourceNode string) { + clusterForCheck, err := newCluster() if err != nil { - return errors.Trace(err) + log.Fatal("failed to create cluster info", zap.Error(err)) } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return errors.Trace(err) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + sourceTables := clusterForCheck.servers[sourceNode] + if len(sourceTables) != 0 { + log.Info("source capture is not empty, retrying", zap.Any("sourceTables", sourceTables)) + } else { + log.Info("source capture is empty, done") + return + } + case <-ctx.Done(): + log.Fatal("context done") + } } +} - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - body, err := io.ReadAll(resp.Body) - if err != nil { - return errors.Trace(err) +func getSourceAndTargetNode(cluster *cluster) (string, string) { + var sourceNode string + for server, tables := range cluster.servers { + if len(tables) == 0 { + continue } - log.Warn("http error", zap.ByteString("body", body)) - return errors.New(resp.Status) + sourceNode = server + break } - return nil + var targetNode string + for candidateNode := range cluster.servers { + if candidateNode != sourceNode { + targetNode = candidateNode + } + } + + if targetNode == "" { + log.Fatal("no target, unexpected") + } + + log.Info("Get source and target node", zap.String("sourceNode", sourceNode), zap.String("targetNode", targetNode)) + + return sourceNode, targetNode }