Skip to content

Commit

Permalink
api: fix move table API (#906)
Browse files Browse the repository at this point in the history
ref #442
  • Loading branch information
asddongmen authored Jan 20, 2025
1 parent 4d1d704 commit eee5de2
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 217 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,17 +334,23 @@ jobs:
run: |
export TICDC_NEWARCH=true && make integration_test CASE=force_replicate_table
# the 12th case in this group
- name: Test tidb_mysql_test
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=tidb_mysql_test
- name: Test resolve_lock
- name: Test resolve_lock
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=resolve_lock
# The 14th case in this group
- name: Test move_table
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=move_table
- name: Upload test logs
if: always()
uses: ./.github/actions/upload-test-logs
Expand Down
10 changes: 6 additions & 4 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
// For compatibility with the old API.
// TiDB Operator relies on this API to determine whether the TiCDC node is healthy.
router.GET("/status", api.serverStatus)
// Intergration test relies on this API to determine whether the TiCDC node is healthy.
// Integration test relies on this API to determine whether the TiCDC node is healthy.
router.GET("/debug/info", gin.WrapF(api.handleDebugInfo))

coordinatorMiddleware := middleware.ForwardToCoordinatorMiddleware(api.server)
Expand All @@ -57,9 +57,11 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, authenticateMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, authenticateMiddleware, api.pauseChangefeed)
changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, authenticateMiddleware, api.deleteChangefeed)
changefeedGroup.POST("/:changefeed_id/move_table", coordinatorMiddleware, authenticateMiddleware, api.moveTable)
changefeedGroup.GET("/:changefeed_id/get_dispatcher_count", coordinatorMiddleware, api.getDispatcherCount)
changefeedGroup.GET("/:changefeed_id/tables", coordinatorMiddleware, api.listTables)

// internal APIs
changefeedGroup.POST("/:changefeed_id/move_table", authenticateMiddleware, api.moveTable)
changefeedGroup.GET("/:changefeed_id/get_dispatcher_count", api.getDispatcherCount)
changefeedGroup.GET("/:changefeed_id/tables", api.listTables)

// capture apis
captureGroup := v2.Group("/captures")
Expand Down
115 changes: 88 additions & 27 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/api/middleware"
"github.com/pingcap/ticdc/downstreamadapter/sink"
apperror "github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -200,9 +201,13 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
log.Info("Create changefeed successfully!",
zap.String("id", info.ChangefeedID.Name()),
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil))
c.JSON(http.StatusOK, toAPIModel(
info,
&config.ChangeFeedStatus{
CheckpointTs: info.StartTs,
},
nil,
))
}

// listChangeFeeds lists all changefeeds in cdc cluster
Expand Down Expand Up @@ -291,15 +296,13 @@ func (h *OpenAPIV2) getChangeFeed(c *gin.Context) {
}

taskStatus := make([]model.CaptureTaskStatus, 0)
detail := toAPIModel(cfInfo, status.CheckpointTs,
status.CheckpointTs, taskStatus)
detail := toAPIModel(cfInfo, status, taskStatus)
c.JSON(http.StatusOK, detail)
}

func toAPIModel(
info *config.ChangeFeedInfo,
resolvedTs uint64,
checkpointTs uint64,
status *config.ChangeFeedStatus,
taskStatus []model.CaptureTaskStatus,
) *ChangeFeedInfo {
var runningError *RunningError
Expand Down Expand Up @@ -332,10 +335,12 @@ func toAPIModel(
State: info.State,
Error: runningError,
CreatorVersion: info.CreatorVersion,
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
CheckpointTime: model.JSONTime(oracle.GetTimeFromTS(checkpointTs)),
CheckpointTs: status.CheckpointTs,
ResolvedTs: status.CheckpointTs,
CheckpointTime: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
TaskStatus: taskStatus,
MaintainerAddr: status.GetMaintainerAddr(),
GID: info.ChangefeedID.ID(),
}
return apiInfoModel
}
Expand Down Expand Up @@ -618,7 +623,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

c.JSON(http.StatusOK, toAPIModel(oldCfInfo, status.CheckpointTs, status.CheckpointTs, nil))
c.JSON(http.StatusOK, toAPIModel(oldCfInfo, status, nil))
}

// verifyResumeChangefeedConfig verifies the changefeed config before resuming a changefeed
Expand Down Expand Up @@ -688,24 +693,42 @@ func (h *OpenAPIV2) moveTable(c *gin.Context) {
changefeedDisplayName.Name))
return
}

// get changefeedID first
coordinator, err := h.server.GetCoordinator()
cfInfo, err := getChangeFeed(c.Request.Host, changefeedDisplayName.Name)
if err != nil {
_ = c.Error(err)
return
}
cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName)

if cfInfo.MaintainerAddr == "" {
_ = c.Error(errors.New("Can't not find maintainer for changefeed: " + changefeedDisplayName.Name))
return
}

selfInfo, err := h.server.SelfInfo()
if err != nil {
_ = c.Error(err)
return
}
changefeedID := cfInfo.ChangefeedID

if cfInfo.MaintainerAddr != selfInfo.AdvertiseAddr {
// Forward the request to the maintainer
middleware.ForwardToServer(c, selfInfo.ID, cfInfo.MaintainerAddr)
c.Abort()
return
}

changefeedID := common.ChangeFeedID{
Id: cfInfo.GID,
DisplayName: common.NewChangeFeedDisplayName(cfInfo.ID, cfInfo.Namespace),
}

maintainerManager := h.server.GetMaintainerManager()
maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID)

if !ok {
log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String()))
log.Error("maintainer not found for changefeed in this node", zap.String("GID", changefeedID.Id.String()), zap.String("Name", changefeedID.DisplayName.String()))
_ = c.Error(apperror.ErrMaintainerNotFounded)
return
}
Expand All @@ -732,24 +755,40 @@ func (h *OpenAPIV2) listTables(c *gin.Context) {
return
}

coordinator, err := h.server.GetCoordinator()
// get changefeedID first
cfInfo, err := getChangeFeed(c.Request.Host, changefeedDisplayName.Name)
if err != nil {
_ = c.Error(err)
return
}

cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName)
if cfInfo.MaintainerAddr == "" {
_ = c.Error(errors.New("Can't not find maintainer for changefeed: " + changefeedDisplayName.Name))
return
}

selfInfo, err := h.server.SelfInfo()
if err != nil {
_ = c.Error(err)
return
}

changefeedID := cfInfo.ChangefeedID
if cfInfo.MaintainerAddr != selfInfo.AdvertiseAddr {
// Forward the request to the maintainer
middleware.ForwardToServer(c, selfInfo.ID, cfInfo.MaintainerAddr)
c.Abort()
return
}

changefeedID := common.ChangeFeedID{
Id: cfInfo.GID,
DisplayName: common.NewChangeFeedDisplayName(cfInfo.ID, cfInfo.Namespace),
}

maintainerManager := h.server.GetMaintainerManager()
maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID)
if !ok {
log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String()))
log.Error("maintainer not found for changefeed in this node", zap.String("GID", changefeedID.Id.String()), zap.String("Name", changefeedID.DisplayName.String()))
_ = c.Error(apperror.ErrMaintainerNotFounded)
return
}
Expand All @@ -768,11 +807,16 @@ func (h *OpenAPIV2) listTables(c *gin.Context) {
nodeTableInfo.addTableID(table.Span.TableID)
}

infos := make([]*NodeTableInfo, 0, len(nodeTableInfoMap))
infos := make([]NodeTableInfo, 0, len(nodeTableInfoMap))
for _, nodeTableInfo := range nodeTableInfoMap {
infos = append(infos, nodeTableInfo)
infos = append(infos, *nodeTableInfo)
}

resp := &ListResponse[NodeTableInfo]{
Total: len(infos),
Items: infos,
}
c.JSON(http.StatusOK, infos)
c.JSON(http.StatusOK, resp)
}

// getDispatcherCount returns the count of dispatcher.
Expand All @@ -784,24 +828,41 @@ func (h *OpenAPIV2) getDispatcherCount(c *gin.Context) {
changefeedDisplayName.Name))
return
}
// get changefeefID first
coordinator, err := h.server.GetCoordinator()

cfInfo, err := getChangeFeed(c.Request.Host, changefeedDisplayName.Name)
if err != nil {
_ = c.Error(err)
return
}
cfInfo, _, err := coordinator.GetChangefeed(c, changefeedDisplayName)

if cfInfo.MaintainerAddr == "" {
_ = c.Error(errors.New("Can't not find maintainer for changefeed: " + changefeedDisplayName.Name))
return
}

selfInfo, err := h.server.SelfInfo()
if err != nil {
_ = c.Error(err)
return
}
changefeedID := cfInfo.ChangefeedID

if cfInfo.MaintainerAddr != selfInfo.AdvertiseAddr {
// Forward the request to the maintainer
middleware.ForwardToServer(c, selfInfo.ID, cfInfo.MaintainerAddr)
c.Abort()
return
}

changefeedID := common.ChangeFeedID{
Id: cfInfo.GID,
DisplayName: common.NewChangeFeedDisplayName(cfInfo.ID, cfInfo.Namespace),
}

maintainerManager := h.server.GetMaintainerManager()
maintainer, ok := maintainerManager.GetMaintainerForChangefeed(changefeedID)

if !ok {
log.Error("maintainer not found for changefeed in this node", zap.String("changefeed", changefeedID.String()))
log.Error("maintainer not found for changefeed in this node", zap.String("GID", changefeedID.Id.String()), zap.String("changefeed", changefeedID.String()))
_ = c.Error(apperror.ErrMaintainerNotFounded)
return
}
Expand Down
81 changes: 81 additions & 0 deletions api/v2/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package v2

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/tiflow/pkg/httputil"
"go.uber.org/zap"
)

func getChangeFeed(host, cfName string) (ChangeFeedInfo, error) {
security := config.GetGlobalServerConfig().Security

uri := fmt.Sprintf("/api/v2/changefeeds/%s", cfName)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(
ctx, "GET", uri, nil)
if err != nil {
return ChangeFeedInfo{}, err
}
req.URL.Host = host

if tls, _ := security.ToTLSConfigWithVerify(); tls != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}

client, err := httputil.NewClient(security)
if err != nil {
return ChangeFeedInfo{}, err
}

log.Info("Send request to coordinator to get changefeed info",
zap.String("host", host),
zap.String("uri", uri),
zap.String("schema", req.URL.Scheme),
)

resp, err := client.Do(req)
if err != nil {
log.Error("failed to get changefeed", zap.Error(err), zap.String("uri", uri))
return ChangeFeedInfo{}, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error("failed to read changefeed response", zap.Error(err), zap.String("uri", uri))
return ChangeFeedInfo{}, err
}

var cfInfo ChangeFeedInfo
if err := json.Unmarshal(body, &cfInfo); err != nil {
log.Error("failed to unmarshal changefeed response", zap.Error(err), zap.String("uri", uri))
return ChangeFeedInfo{}, err
}

return cfInfo, nil
}
4 changes: 4 additions & 0 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"time"

"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -1086,6 +1087,9 @@ type ChangeFeedInfo struct {
CheckpointTs uint64 `json:"checkpoint_ts"`
CheckpointTime model.JSONTime `json:"checkpoint_time"`
TaskStatus []model.CaptureTaskStatus `json:"task_status,omitempty"`

GID common.GID `json:"gid"`
MaintainerAddr string `json:"maintainer_addr,omitempty"`
}

// SyncedStatus describes the detail of a changefeed's synced status
Expand Down
Loading

0 comments on commit eee5de2

Please sign in to comment.