diff --git a/api/middleware/middleware.go b/api/middleware/middleware.go index 6a3ac6a28..59d078a7d 100644 --- a/api/middleware/middleware.go +++ b/api/middleware/middleware.go @@ -19,12 +19,12 @@ import ( "strconv" "time" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/node" "github.com/gin-gonic/gin" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/api" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/httputil" "go.uber.org/zap" ) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index f8b0718e6..ae9a948a7 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -19,6 +19,8 @@ import ( "net/url" "time" + "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/filter" "github.com/flowbehappy/tigate/version" "github.com/gin-gonic/gin" "github.com/google/uuid" @@ -27,7 +29,6 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/owner" "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/filter" "github.com/pingcap/tiflow/pkg/txnutil/gc" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" @@ -128,7 +129,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) { } pdClient := h.server.GetPdClient() - info := &model.ChangeFeedInfo{ + info := &config.ChangeFeedInfo{ UpstreamID: pdClient.GetClusterID(ctx), Namespace: cfg.Namespace, ID: cfg.ID, @@ -265,7 +266,7 @@ func (h *OpenAPIV2) getChangeFeed(c *gin.Context) { } func toAPIModel( - info *model.ChangeFeedInfo, + info *config.ChangeFeedInfo, resolvedTs uint64, checkpointTs uint64, taskStatus []model.CaptureTaskStatus, @@ -555,7 +556,7 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) { } // verify changefeed filter - _, err = filter.NewFilter(oldCfInfo.Config, "") + _, err = filter.NewFilter(oldCfInfo.Config.Filter, "", oldCfInfo.Config.CaseSensitive) if err != nil { _ = c.Error(errors.ErrChangefeedUpdateRefused. GenWithStackByArgs(errors.Cause(err).Error())) diff --git a/api/v2/model.go b/api/v2/model.go index 6e5f5c551..5651a567c 100644 --- a/api/v2/model.go +++ b/api/v2/model.go @@ -17,10 +17,10 @@ import ( "encoding/json" "time" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" bf "github.com/pingcap/tiflow/pkg/binlog-filter" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/security" diff --git a/cmd/cli/cli_changefeed_create.go b/cmd/cli/cli_changefeed_create.go index bbe32e17d..0e85f9629 100644 --- a/cmd/cli/cli_changefeed_create.go +++ b/cmd/cli/cli_changefeed_create.go @@ -23,13 +23,13 @@ import ( v2 "github.com/flowbehappy/tigate/api/v2" "github.com/flowbehappy/tigate/cmd/factory" apiv2client "github.com/flowbehappy/tigate/pkg/api/v2" + "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/filter" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context" "github.com/pingcap/tiflow/pkg/cmd/util" - "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/filter" putil "github.com/pingcap/tiflow/pkg/util" "github.com/spf13/cobra" "github.com/tikv/client-go/v2/oracle" diff --git a/coordinator/changefeed/changefeed.go b/coordinator/changefeed/changefeed.go index 0b02fd84d..747ee6698 100644 --- a/coordinator/changefeed/changefeed.go +++ b/coordinator/changefeed/changefeed.go @@ -18,6 +18,7 @@ import ( "net/url" "github.com/flowbehappy/tigate/heartbeatpb" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/messaging" "github.com/flowbehappy/tigate/pkg/node" "github.com/pingcap/log" @@ -30,7 +31,7 @@ import ( // Changefeed is a memory present for changefeed info and status type Changefeed struct { ID model.ChangeFeedID - Info *model.ChangeFeedInfo + Info *config.ChangeFeedInfo IsMQSink bool nodeID node.ID @@ -44,7 +45,7 @@ type Changefeed struct { // NewChangefeed creates a new changefeed instance func NewChangefeed(cfID model.ChangeFeedID, - info *model.ChangeFeedInfo, + info *config.ChangeFeedInfo, checkpointTs uint64) *Changefeed { uri, err := url.Parse(info.SinkURI) if err != nil { diff --git a/coordinator/changefeed/changefeed_db.go b/coordinator/changefeed/changefeed_db.go index c721454ea..d4696b69d 100644 --- a/coordinator/changefeed/changefeed_db.go +++ b/coordinator/changefeed/changefeed_db.go @@ -17,6 +17,7 @@ import ( "math" "sync" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/node" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -321,7 +322,7 @@ func (db *ChangefeedDB) CalculateGCSafepoint() uint64 { } // ReplaceStoppedChangefeed updates the stopped changefeed -func (db *ChangefeedDB) ReplaceStoppedChangefeed(cf *model.ChangeFeedInfo) { +func (db *ChangefeedDB) ReplaceStoppedChangefeed(cf *config.ChangeFeedInfo) { db.lock.Lock() defer db.lock.Unlock() diff --git a/coordinator/changefeed/changefeed_db_backend.go b/coordinator/changefeed/changefeed_db_backend.go index eaf88eff2..1e51e14fe 100644 --- a/coordinator/changefeed/changefeed_db_backend.go +++ b/coordinator/changefeed/changefeed_db_backend.go @@ -16,6 +16,7 @@ package changefeed import ( "context" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/tiflow/cdc/model" ) @@ -24,9 +25,9 @@ type Backend interface { // GetAllChangefeeds returns all changefeeds from the backend db, include stopped and failed changefeeds GetAllChangefeeds(ctx context.Context) (map[model.ChangeFeedID]*ChangefeedMetaWrapper, error) // CreateChangefeed saves changefeed info and status to db - CreateChangefeed(ctx context.Context, info *model.ChangeFeedInfo) error + CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // UpdateChangefeed updates changefeed info to db - UpdateChangefeed(ctx context.Context, info *model.ChangeFeedInfo) error + UpdateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // PauseChangefeed persists the pause status to db for a changefeed PauseChangefeed(ctx context.Context, id model.ChangeFeedID) error // DeleteChangefeed removes all related info of a changefeed from db @@ -39,6 +40,6 @@ type Backend interface { // ChangefeedMetaWrapper is a wrapper for the changefeed load from the DB type ChangefeedMetaWrapper struct { - Info *model.ChangeFeedInfo + Info *config.ChangeFeedInfo Status *model.ChangeFeedStatus } diff --git a/coordinator/changefeed/etcd_backend.go b/coordinator/changefeed/etcd_backend.go index 3ef4d14cb..92cb1d67d 100644 --- a/coordinator/changefeed/etcd_backend.go +++ b/coordinator/changefeed/etcd_backend.go @@ -19,6 +19,7 @@ import ( "fmt" "strings" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" @@ -69,7 +70,7 @@ func (b *EtcdBackend) GetAllChangefeeds(ctx context.Context) (map[model.ChangeFe } meta.Status = status } else { - detail := &model.ChangeFeedInfo{} + detail := &config.ChangeFeedInfo{} err = detail.Unmarshal(kv.Value) if err != nil { log.Warn("failed to unmarshal change feed Info, ignore", @@ -113,7 +114,7 @@ func (b *EtcdBackend) GetAllChangefeeds(ctx context.Context) (map[model.ChangeFe } func (b *EtcdBackend) CreateChangefeed(ctx context.Context, - info *model.ChangeFeedInfo) error { + info *config.ChangeFeedInfo) error { changefeedID := model.DefaultChangeFeedID(info.ID) infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), changefeedID) infoValue, err := info.Marshal() @@ -146,7 +147,7 @@ func (b *EtcdBackend) CreateChangefeed(ctx context.Context, return nil } -func (b *EtcdBackend) UpdateChangefeed(ctx context.Context, info *model.ChangeFeedInfo) error { +func (b *EtcdBackend) UpdateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { infoKey := etcd.GetEtcdKeyChangeFeedInfo(b.etcdClient.GetClusterID(), model.DefaultChangeFeedID(info.ID)) newStr, err := info.Marshal() if err != nil { diff --git a/coordinator/controller.go b/coordinator/controller.go index 08abad896..d0ebfcca3 100644 --- a/coordinator/controller.go +++ b/coordinator/controller.go @@ -23,6 +23,7 @@ import ( "github.com/flowbehappy/tigate/heartbeatpb" "github.com/flowbehappy/tigate/pkg/bootstrap" appcontext "github.com/flowbehappy/tigate/pkg/common/context" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/messaging" "github.com/flowbehappy/tigate/pkg/metrics" "github.com/flowbehappy/tigate/pkg/node" @@ -148,7 +149,7 @@ func (c *Controller) HandleEvent(event *Event) bool { return false } -func (c *Controller) CreateChangefeed(ctx context.Context, info *model.ChangeFeedInfo) error { +func (c *Controller) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { if !c.bootstrapped { return errors.New("not initialized, wait a moment") } @@ -387,7 +388,7 @@ func (c *Controller) ResumeChangefeed(ctx context.Context, id model.ChangeFeedID return nil } -func (c *Controller) UpdateChangefeed(ctx context.Context, change *model.ChangeFeedInfo) error { +func (c *Controller) UpdateChangefeed(ctx context.Context, change *config.ChangeFeedInfo) error { id := model.ChangeFeedID{ Namespace: change.Namespace, ID: change.ID, @@ -403,9 +404,9 @@ func (c *Controller) UpdateChangefeed(ctx context.Context, change *model.ChangeF return nil } -func (c *Controller) ListChangefeeds(ctx context.Context) ([]*model.ChangeFeedInfo, []*model.ChangeFeedStatus, error) { +func (c *Controller) ListChangefeeds(ctx context.Context) ([]*config.ChangeFeedInfo, []*model.ChangeFeedStatus, error) { cfs := c.changefeedDB.GetAllChangefeeds() - infos := make([]*model.ChangeFeedInfo, 0, len(cfs)) + infos := make([]*config.ChangeFeedInfo, 0, len(cfs)) statuses := make([]*model.ChangeFeedStatus, 0, len(cfs)) for _, cf := range cfs { infos = append(infos, cf.Info) @@ -414,7 +415,7 @@ func (c *Controller) ListChangefeeds(ctx context.Context) ([]*model.ChangeFeedIn return infos, statuses, nil } -func (c *Controller) GetChangefeed(ctx context.Context, id model.ChangeFeedID) (*model.ChangeFeedInfo, *model.ChangeFeedStatus, error) { +func (c *Controller) GetChangefeed(ctx context.Context, id model.ChangeFeedID) (*config.ChangeFeedInfo, *model.ChangeFeedStatus, error) { cf := c.changefeedDB.GetByID(id) if cf == nil { return nil, nil, cerror.ErrChangeFeedNotExists.GenWithStackByArgs(id.ID) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index c5fc2549c..a6cb453e0 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -21,6 +21,7 @@ import ( "github.com/flowbehappy/tigate/coordinator/changefeed" "github.com/flowbehappy/tigate/heartbeatpb" appcontext "github.com/flowbehappy/tigate/pkg/common/context" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/messaging" "github.com/flowbehappy/tigate/pkg/metrics" "github.com/flowbehappy/tigate/pkg/node" @@ -142,7 +143,7 @@ func (c *coordinator) Run(ctx context.Context) error { } } -func (c *coordinator) CreateChangefeed(ctx context.Context, info *model.ChangeFeedInfo) error { +func (c *coordinator) CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error { return c.controller.CreateChangefeed(ctx, info) } @@ -158,15 +159,15 @@ func (c *coordinator) ResumeChangefeed(ctx context.Context, id model.ChangeFeedI return c.controller.ResumeChangefeed(ctx, id, newCheckpointTs) } -func (c *coordinator) UpdateChangefeed(ctx context.Context, change *model.ChangeFeedInfo) error { +func (c *coordinator) UpdateChangefeed(ctx context.Context, change *config.ChangeFeedInfo) error { return c.controller.UpdateChangefeed(ctx, change) } -func (c *coordinator) ListChangefeeds(ctx context.Context) ([]*model.ChangeFeedInfo, []*model.ChangeFeedStatus, error) { +func (c *coordinator) ListChangefeeds(ctx context.Context) ([]*config.ChangeFeedInfo, []*model.ChangeFeedStatus, error) { return c.controller.ListChangefeeds(ctx) } -func (c *coordinator) GetChangefeed(ctx context.Context, id model.ChangeFeedID) (*model.ChangeFeedInfo, *model.ChangeFeedStatus, error) { +func (c *coordinator) GetChangefeed(ctx context.Context, id model.ChangeFeedID) (*config.ChangeFeedInfo, *model.ChangeFeedStatus, error) { return c.controller.GetChangefeed(ctx, id) } diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index f4834e855..c15d4f27a 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -38,7 +38,6 @@ import ( "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/metrics" "github.com/pingcap/tiflow/cdc/model" - cfg "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" ) @@ -134,7 +133,7 @@ func NewEventDispatcherManager(changefeedID model.ChangeFeedID, // Set Filter // TODO: 最后去更新一下 filter 的内部 NewFilter 函数,现在是在套壳适配 - replicaConfig := cfg.ReplicaConfig{Filter: cfConfig.Filter} + replicaConfig := config.ReplicaConfig{Filter: cfConfig.Filter} filter, err := filter.NewFilter(replicaConfig.Filter, cfConfig.TimeZone, replicaConfig.CaseSensitive) if err != nil { return nil, 0, apperror.ErrCreateEventDispatcherManagerFailed.Wrap(err).GenWithStackByArgs("create filter failed") @@ -463,7 +462,7 @@ func (e *EventDispatcherManager) cleanTableEventDispatcher(id common.DispatcherI log.Info("table event dispatcher completely stopped, and delete it from event dispatcher manager", zap.Any("dispatcher id", id)) } -func toFilterConfigPB(filter *cfg.FilterConfig) *eventpb.FilterConfig { +func toFilterConfigPB(filter *config.FilterConfig) *eventpb.FilterConfig { filterConfig := &eventpb.FilterConfig{ Rules: filter.Rules, IgnoreTxnStartTs: filter.IgnoreTxnStartTs, diff --git a/downstreamadapter/sink/helper/eventrouter/event_router.go b/downstreamadapter/sink/helper/eventrouter/event_router.go index 653cd9478..22c3fb2e7 100644 --- a/downstreamadapter/sink/helper/eventrouter/event_router.go +++ b/downstreamadapter/sink/helper/eventrouter/event_router.go @@ -18,10 +18,9 @@ import ( "github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter/topic" "github.com/flowbehappy/tigate/pkg/common" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" - ticonfig "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/log" tableFilter "github.com/pingcap/tidb/pkg/util/table-filter" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) @@ -39,11 +38,11 @@ type EventRouter struct { } // NewEventRouter creates a new EventRouter. -func NewEventRouter(sinkConfig *ticonfig.SinkConfig, protocol config.Protocol, defaultTopic, scheme string) (*EventRouter, error) { +func NewEventRouter(sinkConfig *config.SinkConfig, protocol config.Protocol, defaultTopic, scheme string) (*EventRouter, error) { // If an event does not match any dispatching rules in the config file, // it will be dispatched by the default partition dispatcher and // static topic dispatcher because it matches *.* rule. - ruleConfigs := append(sinkConfig.DispatchRules, &ticonfig.DispatchRule{ + ruleConfigs := append(sinkConfig.DispatchRules, &config.DispatchRule{ Matcher: []string{"*.*"}, PartitionRule: "default", TopicRule: "", diff --git a/downstreamadapter/sink/helper/eventrouter/topic/expression.go b/downstreamadapter/sink/helper/eventrouter/topic/expression.go index 1191358e0..d8e4dab73 100644 --- a/downstreamadapter/sink/helper/eventrouter/topic/expression.go +++ b/downstreamadapter/sink/helper/eventrouter/topic/expression.go @@ -4,7 +4,7 @@ import ( "regexp" "strings" - "github.com/pingcap/tiflow/pkg/config" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" ) diff --git a/downstreamadapter/sink/helper/eventrouter/topic/topic.go b/downstreamadapter/sink/helper/eventrouter/topic/topic.go index 2f2e8bee4..5348cbdcf 100644 --- a/downstreamadapter/sink/helper/eventrouter/topic/topic.go +++ b/downstreamadapter/sink/helper/eventrouter/topic/topic.go @@ -1,7 +1,7 @@ package topic import ( - "github.com/pingcap/tiflow/pkg/config" + "github.com/flowbehappy/tigate/pkg/config" ) type TopicGeneratorType int diff --git a/downstreamadapter/sink/helper/helper.go b/downstreamadapter/sink/helper/helper.go index 38771dacf..e2d1787fe 100644 --- a/downstreamadapter/sink/helper/helper.go +++ b/downstreamadapter/sink/helper/helper.go @@ -1,9 +1,23 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package helper import ( "net/url" "strings" + "github.com/flowbehappy/tigate/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) @@ -17,3 +31,30 @@ func GetTopic(sinkURI *url.URL) (string, error) { } return topic, nil } + +// GetProtocol returns the protocol from the sink URI. +func GetProtocol(protocolStr string) (config.Protocol, error) { + protocol, err := config.ParseSinkProtocolFromString(protocolStr) + if err != nil { + return protocol, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) + } + + return protocol, nil +} + +// GetFileExtension returns the extension for specific protocol +func GetFileExtension(protocol config.Protocol) string { + switch protocol { + case config.ProtocolAvro, config.ProtocolCanalJSON, config.ProtocolMaxwell, + config.ProtocolOpen, config.ProtocolSimple: + return ".json" + case config.ProtocolCraft: + return ".craft" + case config.ProtocolCanal: + return ".canal" + case config.ProtocolCsv: + return ".csv" + default: + return ".unknown" + } +} diff --git a/downstreamadapter/sink/kafka_sink.go b/downstreamadapter/sink/kafka_sink.go index 28020fffa..60604dce8 100644 --- a/downstreamadapter/sink/kafka_sink.go +++ b/downstreamadapter/sink/kafka_sink.go @@ -17,6 +17,7 @@ import ( "context" "net/url" + "github.com/flowbehappy/tigate/downstreamadapter/sink/helper" "github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter" "github.com/flowbehappy/tigate/downstreamadapter/sink/helper/topicmanager" "github.com/flowbehappy/tigate/downstreamadapter/sink/types" @@ -35,7 +36,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" - "github.com/pingcap/tiflow/cdc/sink/util" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" utils "github.com/pingcap/tiflow/pkg/util" @@ -54,12 +54,12 @@ func (s *KafkaSink) SinkType() SinkType { func NewKafkaSink(changefeedID model.ChangeFeedID, sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) (*KafkaSink, error) { ctx := context.Background() - topic, err := util.GetTopic(sinkURI) + topic, err := helper.GetTopic(sinkURI) if err != nil { return nil, errors.Trace(err) } scheme := sink.GetScheme(sinkURI) - protocol, err := util.GetProtocol(utils.GetOrZero(sinkConfig.Protocol)) + protocol, err := helper.GetProtocol(utils.GetOrZero(sinkConfig.Protocol)) if err != nil { return nil, errors.Trace(err) } diff --git a/downstreamadapter/worker/kafka_ddl_worker.go b/downstreamadapter/worker/kafka_ddl_worker.go index ed1e46ab9..153a9928c 100644 --- a/downstreamadapter/worker/kafka_ddl_worker.go +++ b/downstreamadapter/worker/kafka_ddl_worker.go @@ -8,6 +8,7 @@ import ( "github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter" "github.com/flowbehappy/tigate/downstreamadapter/sink/helper/topicmanager" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/metrics" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/flowbehappy/tigate/pkg/sink/util" @@ -15,7 +16,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/ddlsink/mq/ddlproducer" - "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) diff --git a/downstreamadapter/worker/kafka_worker.go b/downstreamadapter/worker/kafka_worker.go index adc6ee69e..29050e916 100644 --- a/downstreamadapter/worker/kafka_worker.go +++ b/downstreamadapter/worker/kafka_worker.go @@ -18,6 +18,7 @@ import ( "sync" "time" + "github.com/flowbehappy/tigate/pkg/config" "go.uber.org/atomic" "github.com/flowbehappy/tigate/downstreamadapter/sink/helper/eventrouter" @@ -30,7 +31,6 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer" - "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) diff --git a/maintainer/maintainer_controller.go b/maintainer/maintainer_controller.go index 89c827378..2fe646704 100644 --- a/maintainer/maintainer_controller.go +++ b/maintainer/maintainer_controller.go @@ -26,6 +26,7 @@ import ( "github.com/flowbehappy/tigate/pkg/common" appcontext "github.com/flowbehappy/tigate/pkg/common/context" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/messaging" "github.com/flowbehappy/tigate/pkg/node" "github.com/flowbehappy/tigate/server/watcher" @@ -33,7 +34,6 @@ import ( "github.com/flowbehappy/tigate/utils/threadpool" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/spanz" "go.uber.org/zap" diff --git a/maintainer/split/splitter.go b/maintainer/split/splitter.go index 64c327dbf..ddea25dc3 100644 --- a/maintainer/split/splitter.go +++ b/maintainer/split/splitter.go @@ -19,10 +19,10 @@ import ( "github.com/flowbehappy/tigate/heartbeatpb" "github.com/flowbehappy/tigate/maintainer/replica" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/utils" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" diff --git a/pkg/config/cdc_v2.go b/pkg/config/cdc_v2.go deleted file mode 100644 index 1869e45d4..000000000 --- a/pkg/config/cdc_v2.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package config - -import ( - "fmt" - "net" - "net/url" - - dmysql "github.com/go-sql-driver/mysql" - "github.com/pingcap/errors" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/security" -) - -// CDCV2 represents config for ticdc v2 -type CDCV2 struct { - // Enable represents if the cdc v2 is enabled or not - Enable bool `toml:"enable" json:"enable"` - // MetaStoreConfig represents config for new meta store configurations - MetaStoreConfig MetaStoreConfiguration `toml:"meta-store" json:"meta-store"` -} - -// MetaStoreConfiguration represents config for new meta store configurations -type MetaStoreConfiguration struct { - // URI is the address of the meta store. - // for example: "mysql://127.0.0.1:3306/test" - URI string `toml:"uri" json:"uri"` - // SSLCA is the path of the CA certificate file. - SSLCa string `toml:"ssl-ca" json:"ssl-ca"` - SSLCert string `toml:"ssl-cert" json:"ssl-cert"` - SSLKey string `toml:"ssl-key" json:"ssl-key"` -} - -// ValidateAndAdjust validates the meta store configurations -func (c *CDCV2) ValidateAndAdjust() error { - if !c.Enable { - return nil - } - if c.MetaStoreConfig.URI == "" { - return errors.New("missing meta store uri configuration") - } - parsedURI, err := url.Parse(c.MetaStoreConfig.URI) - if err != nil { - return errors.Trace(err) - } - if !isSupportedScheme(parsedURI.Scheme) { - return errors.Errorf("the %s scheme is not supported by meta store", parsedURI.Scheme) - } - return nil -} - -// GenDSN generates a DSN from the given metastore config. -func (cfg *MetaStoreConfiguration) GenDSN() (*dmysql.Config, error) { - endpoint, err := url.Parse(cfg.URI) - if err != nil { - return nil, errors.Trace(err) - } - tls, err := cfg.getSSLParam() - if err != nil { - return nil, errors.Trace(err) - } - username := endpoint.User.Username() - if username == "" { - username = "root" - } - password, _ := endpoint.User.Password() - - hostName := endpoint.Hostname() - port := endpoint.Port() - if port == "" { - port = "3306" - } - - // This will handle the IPv6 address format. - var dsn *dmysql.Config - host := net.JoinHostPort(hostName, port) - // dsn format of the driver: - // [username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN] - dsnStr := fmt.Sprintf("%s:%s@tcp(%s)%s%s", username, password, host, endpoint.Path, tls) - if dsn, err = dmysql.ParseDSN(dsnStr); err != nil { - return nil, errors.Trace(err) - } - - // create test db used for parameter detection - // Refer https://github.com/go-sql-driver/mysql#parameters - if dsn.Params == nil { - dsn.Params = make(map[string]string) - } - // enable parseTime for time.Time type - dsn.Params["parseTime"] = "true" - for key, pa := range endpoint.Query() { - dsn.Params[key] = pa[0] - } - return dsn, nil -} - -func (cfg *MetaStoreConfiguration) getSSLParam() (string, error) { - if len(cfg.SSLCa) == 0 || len(cfg.SSLCert) == 0 || len(cfg.SSLKey) == 0 { - return "", nil - } - credential := security.Credential{ - CAPath: cfg.SSLCa, - CertPath: cfg.SSLCert, - KeyPath: cfg.SSLKey, - } - tlsCfg, err := credential.ToTLSConfig() - if err != nil { - return "", errors.Trace(err) - } - name := "cdc_mysql_tls_meta_store" - err = dmysql.RegisterTLSConfig(name, tlsCfg) - if err != nil { - return "", cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") - } - return "?tls=" + name, nil -} - -// isSupportedScheme returns true if the scheme is compatible with MySQL. -func isSupportedScheme(scheme string) bool { - return scheme == "mysql" -} diff --git a/pkg/config/changefeed.go b/pkg/config/changefeed.go index 91f9ee51a..67a0de4c5 100644 --- a/pkg/config/changefeed.go +++ b/pkg/config/changefeed.go @@ -1,10 +1,20 @@ package config import ( + "encoding/json" + "math" + "net/url" "time" + "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/util" + "github.com/pingcap/tiflow/pkg/version" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" ) type ChangefeedConfig struct { @@ -16,8 +26,8 @@ type ChangefeedConfig struct { // timezone used when checking sink uri TimeZone string `json:"timezone" default:"system"` // if true, force to replicate some ineligible tables - ForceReplicate bool `json:"force_replicate" default:"false"` - Filter *config.FilterConfig `toml:"filter" json:"filter"` + ForceReplicate bool `json:"force_replicate" default:"false"` + Filter *FilterConfig `toml:"filter" json:"filter"` //sync point related // TODO:syncPointRetention|default 可以不要吗 @@ -56,3 +66,377 @@ type ChangeFeedInfo struct { // Epoch is the epoch of a changefeed, changes on every restart. Epoch uint64 `json:"epoch"` } + +// NeedBlockGC returns true if the changefeed need to block the GC safepoint. +// Note: if the changefeed is failed by GC, it should not block the GC safepoint. +func (info *ChangeFeedInfo) NeedBlockGC() bool { + switch info.State { + case model.StateNormal, model.StateStopped, model.StatePending, model.StateWarning: + return true + case model.StateFailed: + return !info.isFailedByGC() + case model.StateFinished, model.StateRemoved: + default: + } + return false +} + +func (info *ChangeFeedInfo) isFailedByGC() bool { + if info.Error == nil { + log.Panic("changefeed info is not consistent", + zap.Any("state", info.State), zap.Any("error", info.Error)) + } + return cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) +} + +// String implements fmt.Stringer interface, but hide some sensitive information +func (info *ChangeFeedInfo) String() (str string) { + var err error + str, err = info.Marshal() + if err != nil { + log.Error("failed to marshal changefeed info", zap.Error(err)) + return + } + clone := new(ChangeFeedInfo) + err = clone.Unmarshal([]byte(str)) + if err != nil { + log.Error("failed to unmarshal changefeed info", zap.Error(err)) + return + } + + clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI) + if clone.Config != nil { + clone.Config.MaskSensitiveData() + } + + str, err = clone.Marshal() + if err != nil { + log.Error("failed to marshal changefeed info", zap.Error(err)) + } + return +} + +// GetStartTs returns StartTs if it's specified or using the +// CreateTime of changefeed. +func (info *ChangeFeedInfo) GetStartTs() uint64 { + if info.StartTs > 0 { + return info.StartTs + } + + return oracle.GoTimeToTS(info.CreateTime) +} + +// GetCheckpointTs returns CheckpointTs if it's specified in ChangeFeedStatus, otherwise StartTs is returned. +func (info *ChangeFeedInfo) GetCheckpointTs(status *model.ChangeFeedStatus) uint64 { + if status != nil { + return status.CheckpointTs + } + return info.GetStartTs() +} + +// GetTargetTs returns TargetTs if it's specified, otherwise MaxUint64 is returned. +func (info *ChangeFeedInfo) GetTargetTs() uint64 { + if info.TargetTs > 0 { + return info.TargetTs + } + return uint64(math.MaxUint64) +} + +// Marshal returns the json marshal format of a ChangeFeedInfo +func (info *ChangeFeedInfo) Marshal() (string, error) { + data, err := json.Marshal(info) + return string(data), cerror.WrapError(cerror.ErrMarshalFailed, err) +} + +// Unmarshal unmarshals into *ChangeFeedInfo from json marshal byte slice +func (info *ChangeFeedInfo) Unmarshal(data []byte) error { + err := json.Unmarshal(data, &info) + if err != nil { + return errors.Annotatef( + cerror.WrapError(cerror.ErrUnmarshalFailed, err), "Unmarshal data: %v", data) + } + return nil +} + +// Clone returns a cloned ChangeFeedInfo +func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) { + s, err := info.Marshal() + if err != nil { + return nil, err + } + cloned := new(ChangeFeedInfo) + err = cloned.Unmarshal([]byte(s)) + return cloned, err +} + +// VerifyAndComplete verifies changefeed info and may fill in some fields. +// If a required field is not provided, return an error. +// If some necessary filed is missing but can use a default value, fill in it. +func (info *ChangeFeedInfo) VerifyAndComplete() { + defaultConfig := GetDefaultReplicaConfig() + if info.Config.Filter == nil { + info.Config.Filter = defaultConfig.Filter + } + if info.Config.Mounter == nil { + info.Config.Mounter = defaultConfig.Mounter + } + if info.Config.Sink == nil { + info.Config.Sink = defaultConfig.Sink + } + if info.Config.Consistent == nil { + info.Config.Consistent = defaultConfig.Consistent + } + if info.Config.Scheduler == nil { + info.Config.Scheduler = defaultConfig.Scheduler + } + + if info.Config.Integrity == nil { + info.Config.Integrity = defaultConfig.Integrity + } + if info.Config.ChangefeedErrorStuckDuration == nil { + info.Config.ChangefeedErrorStuckDuration = defaultConfig.ChangefeedErrorStuckDuration + } + if info.Config.SyncedStatus == nil { + info.Config.SyncedStatus = defaultConfig.SyncedStatus + } + info.RmUnusedFields() +} + +// RmUnusedFields removes unnecessary fields based on the downstream type and +// the protocol. Since we utilize a common changefeed configuration template, +// certain fields may not be utilized for certain protocols. +func (info *ChangeFeedInfo) RmUnusedFields() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn( + "failed to parse the sink uri", + zap.Error(err), + zap.Any("sinkUri", info.SinkURI), + ) + return + } + // blackhole is for testing purpose, no need to remove fields + if sink.IsBlackHoleScheme(uri.Scheme) { + return + } + if !sink.IsMQScheme(uri.Scheme) { + info.rmMQOnlyFields() + } else { + // remove schema registry for MQ downstream with + // protocol other than avro + if util.GetOrZero(info.Config.Sink.Protocol) != ProtocolAvro.String() { + info.Config.Sink.SchemaRegistry = nil + } + } + + if !sink.IsStorageScheme(uri.Scheme) { + info.rmStorageOnlyFields() + } + + if !sink.IsMySQLCompatibleScheme(uri.Scheme) { + info.rmDBOnlyFields() + } else { + // remove fields only being used by MQ and Storage downstream + info.Config.Sink.Protocol = nil + info.Config.Sink.Terminator = nil + } +} + +func (info *ChangeFeedInfo) rmMQOnlyFields() { + log.Info("since the downstream is not a MQ, remove MQ only fields", + zap.String("namespace", info.Namespace), + zap.String("changefeed", info.ID)) + info.Config.Sink.DispatchRules = nil + info.Config.Sink.SchemaRegistry = nil + info.Config.Sink.EncoderConcurrency = nil + info.Config.Sink.EnableKafkaSinkV2 = nil + info.Config.Sink.OnlyOutputUpdatedColumns = nil + info.Config.Sink.DeleteOnlyOutputHandleKeyColumns = nil + info.Config.Sink.ContentCompatible = nil + info.Config.Sink.KafkaConfig = nil +} + +func (info *ChangeFeedInfo) rmStorageOnlyFields() { + info.Config.Sink.CSVConfig = nil + info.Config.Sink.DateSeparator = nil + info.Config.Sink.EnablePartitionSeparator = nil + info.Config.Sink.FileIndexWidth = nil + info.Config.Sink.CloudStorageConfig = nil +} + +func (info *ChangeFeedInfo) rmDBOnlyFields() { + info.Config.EnableSyncPoint = nil + info.Config.BDRMode = nil + info.Config.SyncPointInterval = nil + info.Config.SyncPointRetention = nil + info.Config.Consistent = nil + info.Config.Sink.SafeMode = nil + info.Config.Sink.MySQLConfig = nil +} + +// FixIncompatible fixes incompatible changefeed meta info. +func (info *ChangeFeedInfo) FixIncompatible() { + creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion) + if creatorVersionGate.ChangefeedStateFromAdminJob() { + log.Info("Start fixing incompatible changefeed state", zap.String("changefeed", info.String())) + info.fixState() + log.Info("Fix incompatibility changefeed state completed", zap.String("changefeed", info.String())) + } + + if creatorVersionGate.ChangefeedAcceptUnknownProtocols() { + log.Info("Start fixing incompatible changefeed MQ sink protocol", zap.String("changefeed", info.String())) + info.fixMQSinkProtocol() + log.Info("Fix incompatibility changefeed MQ sink protocol completed", zap.String("changefeed", info.String())) + } + + if creatorVersionGate.ChangefeedAcceptProtocolInMysqlSinURI() { + log.Info("Start fixing incompatible changefeed sink uri", zap.String("changefeed", info.String())) + info.fixMySQLSinkProtocol() + log.Info("Fix incompatibility changefeed sink uri completed", zap.String("changefeed", info.String())) + } + + if info.Config.MemoryQuota == uint64(0) { + log.Info("Start fixing incompatible memory quota", zap.String("changefeed", info.String())) + info.fixMemoryQuota() + log.Info("Fix incompatible memory quota completed", zap.String("changefeed", info.String())) + } + + if info.Config.ChangefeedErrorStuckDuration == nil { + log.Info("Start fixing incompatible error stuck duration", zap.String("changefeed", info.String())) + info.Config.ChangefeedErrorStuckDuration = GetDefaultReplicaConfig().ChangefeedErrorStuckDuration + log.Info("Fix incompatible error stuck duration completed", zap.String("changefeed", info.String())) + } + + log.Info("Start fixing incompatible scheduler", zap.String("changefeed", info.String())) + inheritV66 := creatorVersionGate.ChangefeedInheritSchedulerConfigFromV66() + info.fixScheduler(inheritV66) + log.Info("Fix incompatible scheduler completed", zap.String("changefeed", info.String())) +} + +// fixState attempts to fix state loss from upgrading the old owner to the new owner. +func (info *ChangeFeedInfo) fixState() { + // Notice: In the old owner we used AdminJobType field to determine if the task was paused or not, + // we need to handle this field in the new owner. + // Otherwise, we will see that the old version of the task is paused and then upgraded, + // and the task is automatically resumed after the upgrade. + state := info.State + // Upgrading from an old owner, we need to deal with cases where the state is normal, + // but actually contains errors and does not match the admin job type. + if state == model.StateNormal { + switch info.AdminJobType { + // This corresponds to the case of failure or error. + case model.AdminNone, model.AdminResume: + if info.Error != nil { + if cerror.IsChangefeedGCFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) { + state = model.StateFailed + } else { + state = model.StateWarning + } + } + case model.AdminStop: + state = model.StateStopped + case model.AdminFinish: + state = model.StateFinished + case model.AdminRemove: + state = model.StateRemoved + } + } + + if state != info.State { + log.Info("handle old owner inconsistent state", + zap.String("oldState", string(info.State)), + zap.String("adminJob", info.AdminJobType.String()), + zap.String("newState", string(state))) + info.State = state + } +} + +func (info *ChangeFeedInfo) fixMySQLSinkProtocol() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn("parse sink URI failed", zap.Error(err)) + // SAFETY: It is safe to ignore this unresolvable sink URI here, + // as it is almost impossible for this to happen. + // If we ignore it when fixing it after it happens, + // it will expose the problem when starting the changefeed, + // which is easier to troubleshoot than reporting the error directly in the bootstrap process. + return + } + + if sink.IsMQScheme(uri.Scheme) { + return + } + + query := uri.Query() + protocolStr := query.Get(ProtocolKey) + if protocolStr != "" || info.Config.Sink.Protocol != nil { + maskedSinkURI, _ := util.MaskSinkURI(info.SinkURI) + log.Warn("sink URI or sink config contains protocol, but scheme is not mq", + zap.String("sinkURI", maskedSinkURI), + zap.String("protocol", protocolStr), + zap.Any("sinkConfig", info.Config.Sink)) + // always set protocol of mysql sink to "" + query.Del(ProtocolKey) + info.updateSinkURIAndConfigProtocol(uri, "", query) + } +} + +func (info *ChangeFeedInfo) fixMQSinkProtocol() { + uri, err := url.Parse(info.SinkURI) + if err != nil { + log.Warn("parse sink URI failed", zap.Error(err)) + return + } + + if !sink.IsMQScheme(uri.Scheme) { + return + } + + needsFix := func(protocolStr string) bool { + _, err := ParseSinkProtocolFromString(protocolStr) + // There are two cases: + // 1. there is an error indicating that the old ticdc accepts + // a protocol that is not known. It needs to be fixed as open protocol. + // 2. If it is default, then it needs to be fixed as open protocol. + return err != nil || protocolStr == ProtocolDefault.String() + } + + query := uri.Query() + protocol := query.Get(ProtocolKey) + openProtocol := ProtocolOpen.String() + + // The sinkURI always has a higher priority. + if protocol != "" && needsFix(protocol) { + query.Set(ProtocolKey, openProtocol) + info.updateSinkURIAndConfigProtocol(uri, openProtocol, query) + return + } + + if needsFix(util.GetOrZero(info.Config.Sink.Protocol)) { + log.Info("handle incompatible protocol from sink config", + zap.String("oldProtocol", util.GetOrZero(info.Config.Sink.Protocol)), + zap.String("fixedProtocol", openProtocol)) + info.Config.Sink.Protocol = util.AddressOf(openProtocol) + } +} + +func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) { + newRawQuery := newQuery.Encode() + maskedURI, _ := util.MaskSinkURI(uri.String()) + log.Info("handle incompatible protocol from sink URI", + zap.String("oldURI", maskedURI), + zap.String("newProtocol", newProtocol)) + + uri.RawQuery = newRawQuery + fixedSinkURI := uri.String() + info.SinkURI = fixedSinkURI + info.Config.Sink.Protocol = util.AddressOf(newProtocol) +} + +func (info *ChangeFeedInfo) fixMemoryQuota() { + info.Config.FixMemoryQuota() +} + +func (info *ChangeFeedInfo) fixScheduler(inheritV66 bool) { + info.Config.FixScheduler(inheritV66) +} diff --git a/pkg/config/consistent.go b/pkg/config/consistent.go new file mode 100644 index 000000000..fdcfe2c30 --- /dev/null +++ b/pkg/config/consistent.go @@ -0,0 +1,125 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/pkg/compression" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/redo" + "github.com/pingcap/tiflow/pkg/util" +) + +// ConsistentConfig represents replication consistency config for a changefeed. +// It is used by redo log functionality. +type ConsistentConfig struct { + // Level is the consistency level, it can be `none` or `eventual`. + // `eventual` means enable redo log. + // Default is `none`. + Level string `toml:"level" json:"level"` + // MaxLogSize is the max size(MiB) of a log file written by redo log. + // Default is 64MiB. + MaxLogSize int64 `toml:"max-log-size" json:"max-log-size"` + // FlushIntervalInMs is the flush interval(ms) of redo log to flush log to storage. + // Default is 2000ms. + FlushIntervalInMs int64 `toml:"flush-interval" json:"flush-interval"` + // MetaFlushIntervalInMs is the flush interval(ms) of redo log to + // flush meta(resolvedTs and checkpointTs) to storage. + // Default is 200ms. + MetaFlushIntervalInMs int64 `toml:"meta-flush-interval" json:"meta-flush-interval"` + // EncodingWorkerNum is the number of workers to encode `RowChangeEvent`` to redo log. + // Default is 16. + EncodingWorkerNum int `toml:"encoding-worker-num" json:"encoding-worker-num"` + // FlushWorkerNum is the number of workers to flush redo log to storage. + // Default is 8. + FlushWorkerNum int `toml:"flush-worker-num" json:"flush-worker-num"` + // Storage is the storage path(uri) to store redo log. + Storage string `toml:"storage" json:"storage"` + // UseFileBackend is a flag to enable file backend for redo log. + // file backend means before flush redo log to storage, it will be written to local file. + // Default is false. + UseFileBackend bool `toml:"use-file-backend" json:"use-file-backend"` + // Compression is the compression algorithm used for redo log. + // Default is "", it means no compression, equals to `none`. + // Supported compression algorithms are `none` and `lz4`. + Compression string `toml:"compression" json:"compression"` + // FlushConcurrency is the concurrency of flushing a single log file. + // Default is 1. It means a single log file will be flushed by only one worker. + // The singe file concurrent flushing feature supports only `s3` storage. + FlushConcurrency int `toml:"flush-concurrency" json:"flush-concurrency,omitempty"` + // MemoryUsage represents the percentage of ReplicaConfig.MemoryQuota + // that can be utilized by the redo log module. + MemoryUsage *ConsistentMemoryUsage `toml:"memory-usage" json:"memory-usage"` +} + +// ConsistentMemoryUsage represents memory usage of Consistent module. +type ConsistentMemoryUsage struct { + // ReplicaConfig.MemoryQuota * MemoryQuotaPercentage / 100 will be used for redo events. + MemoryQuotaPercentage uint64 `toml:"memory-quota-percentage" json:"memory-quota-percentage"` +} + +// ValidateAndAdjust validates the consistency config and adjusts it if necessary. +func (c *ConsistentConfig) ValidateAndAdjust() error { + if !redo.IsConsistentEnabled(c.Level) { + return nil + } + + if c.MaxLogSize == 0 { + c.MaxLogSize = redo.DefaultMaxLogSize + } + + if c.FlushIntervalInMs == 0 { + c.FlushIntervalInMs = redo.DefaultFlushIntervalInMs + } + if c.FlushIntervalInMs < redo.MinFlushIntervalInMs { + return cerror.ErrInvalidReplicaConfig.FastGenByArgs( + fmt.Sprintf("The consistent.flush-interval:%d must be equal or greater than %d", + c.FlushIntervalInMs, redo.MinFlushIntervalInMs)) + } + + if c.MetaFlushIntervalInMs == 0 { + c.MetaFlushIntervalInMs = redo.DefaultMetaFlushIntervalInMs + } + if c.MetaFlushIntervalInMs < redo.MinFlushIntervalInMs { + return cerror.ErrInvalidReplicaConfig.FastGenByArgs( + fmt.Sprintf("The consistent.meta-flush-interval:%d must be equal or greater than %d", + c.MetaFlushIntervalInMs, redo.MinFlushIntervalInMs)) + } + if len(c.Compression) > 0 && + c.Compression != compression.None && c.Compression != compression.LZ4 { + return cerror.ErrInvalidReplicaConfig.FastGenByArgs( + fmt.Sprintf("The consistent.compression:%s must be 'none' or 'lz4'", c.Compression)) + } + + if c.EncodingWorkerNum == 0 { + c.EncodingWorkerNum = redo.DefaultEncodingWorkerNum + } + if c.FlushWorkerNum == 0 { + c.FlushWorkerNum = redo.DefaultFlushWorkerNum + } + + uri, err := storage.ParseRawURL(c.Storage) + if err != nil { + return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs( + fmt.Sprintf("invalid storage uri: %s", c.Storage)) + } + return redo.ValidateStorage(uri) +} + +// MaskSensitiveData masks sensitive data in ConsistentConfig +func (c *ConsistentConfig) MaskSensitiveData() { + c.Storage = util.MaskSensitiveDataInURI(c.Storage) +} diff --git a/pkg/config/debug.go b/pkg/config/debug.go index 00e38f841..a7fc9d981 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -28,9 +28,6 @@ type DebugConfig struct { // Scheduler is the configuration of the two-phase scheduler. Scheduler *SchedulerConfig `toml:"scheduler" json:"scheduler"` - // CDCV2 enables ticdc version 2 implementation with new metastore - CDCV2 *CDCV2 `toml:"cdc-v2" json:"cdc-v2"` - // Puller is the configuration of the puller. Puller *PullerConfig `toml:"puller" json:"puller"` } @@ -46,9 +43,6 @@ func (c *DebugConfig) ValidateAndAdjust() error { if err := c.Scheduler.ValidateAndAdjust(); err != nil { return errors.Trace(err) } - if err := c.CDCV2.ValidateAndAdjust(); err != nil { - return errors.Trace(err) - } return nil } diff --git a/pkg/config/filter.go b/pkg/config/filter.go new file mode 100644 index 000000000..f8409a5ff --- /dev/null +++ b/pkg/config/filter.go @@ -0,0 +1,38 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + bf "github.com/pingcap/tiflow/pkg/binlog-filter" +) + +// FilterConfig represents filter config for a changefeed +type FilterConfig struct { + Rules []string `toml:"rules" json:"rules"` + IgnoreTxnStartTs []uint64 `toml:"ignore-txn-start-ts" json:"ignore-txn-start-ts"` + EventFilters []*EventFilterRule `toml:"event-filters" json:"event-filters"` +} + +// EventFilterRule is used by sql event filter and expression filter +type EventFilterRule struct { + Matcher []string `toml:"matcher" json:"matcher"` + IgnoreEvent []bf.EventType `toml:"ignore-event" json:"ignore-event"` + // regular expression + IgnoreSQL []string `toml:"ignore-sql" json:"ignore-sql"` + // sql expression + IgnoreInsertValueExpr string `toml:"ignore-insert-value-expr" json:"ignore-insert-value-expr"` + IgnoreUpdateNewValueExpr string `toml:"ignore-update-new-value-expr" json:"ignore-update-new-value-expr"` + IgnoreUpdateOldValueExpr string `toml:"ignore-update-old-value-expr" json:"ignore-update-old-value-expr"` + IgnoreDeleteValueExpr string `toml:"ignore-delete-value-expr" json:"ignore-delete-value-expr"` +} diff --git a/pkg/config/integrity.go b/pkg/config/integrity.go new file mode 100644 index 000000000..5be0f9993 --- /dev/null +++ b/pkg/config/integrity.go @@ -0,0 +1,71 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "github.com/pingcap/log" + cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" +) + +// Config represents integrity check config for a changefeed. +type Config struct { + IntegrityCheckLevel string `toml:"integrity-check-level" json:"integrity-check-level"` + CorruptionHandleLevel string `toml:"corruption-handle-level" json:"corruption-handle-level"` +} + +const ( + // CheckLevelNone means no integrity check, the default value. + CheckLevelNone string = "none" + // CheckLevelCorrectness means check each row data correctness. + CheckLevelCorrectness string = "correctness" +) + +const ( + // CorruptionHandleLevelWarn is the default value, + // log the corrupted event, and mark it as corrupted and send it to the downstream. + CorruptionHandleLevelWarn string = "warn" + // CorruptionHandleLevelError means log the corrupted event, and then stopped the changefeed. + CorruptionHandleLevelError string = "error" +) + +// Validate the integrity config. +func (c *Config) Validate() error { + if c.IntegrityCheckLevel != CheckLevelNone && + c.IntegrityCheckLevel != CheckLevelCorrectness { + return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs() + } + if c.CorruptionHandleLevel != CorruptionHandleLevelWarn && + c.CorruptionHandleLevel != CorruptionHandleLevelError { + return cerror.ErrInvalidReplicaConfig.GenWithStackByArgs() + } + + if c.Enabled() { + log.Info("integrity check is enabled", + zap.Any("integrityCheckLevel", c.IntegrityCheckLevel), + zap.Any("corruptionHandleLevel", c.CorruptionHandleLevel)) + } + + return nil +} + +// Enabled returns true if the integrity check is enabled. +func (c *Config) Enabled() bool { + return c.IntegrityCheckLevel == CheckLevelCorrectness +} + +// ErrorHandle returns true if the corruption handle level is error. +func (c *Config) ErrorHandle() bool { + return c.CorruptionHandleLevel == CorruptionHandleLevelError +} diff --git a/pkg/config/large_message.go b/pkg/config/large_message.go new file mode 100644 index 000000000..587a37f4c --- /dev/null +++ b/pkg/config/large_message.go @@ -0,0 +1,115 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "github.com/pingcap/tiflow/pkg/compression" + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +const ( + // LargeMessageHandleOptionNone means not handling large message. + LargeMessageHandleOptionNone string = "none" + // LargeMessageHandleOptionClaimCheck means handling large message by sending to the claim check storage. + LargeMessageHandleOptionClaimCheck string = "claim-check" + // LargeMessageHandleOptionHandleKeyOnly means handling large message by sending only handle key columns. + LargeMessageHandleOptionHandleKeyOnly string = "handle-key-only" +) + +// LargeMessageHandleConfig is the configuration for handling large message. +type LargeMessageHandleConfig struct { + LargeMessageHandleOption string `toml:"large-message-handle-option" json:"large-message-handle-option"` + LargeMessageHandleCompression string `toml:"large-message-handle-compression" json:"large-message-handle-compression"` + ClaimCheckStorageURI string `toml:"claim-check-storage-uri" json:"claim-check-storage-uri"` + ClaimCheckRawValue bool `toml:"claim-check-raw-value" json:"claim-check-raw-value"` +} + +// NewDefaultLargeMessageHandleConfig return the default Config. +func NewDefaultLargeMessageHandleConfig() *LargeMessageHandleConfig { + return &LargeMessageHandleConfig{ + LargeMessageHandleOption: LargeMessageHandleOptionNone, + LargeMessageHandleCompression: compression.None, + } +} + +// AdjustAndValidate the Config. +func (c *LargeMessageHandleConfig) AdjustAndValidate(protocol Protocol, enableTiDBExtension bool) error { + if c.LargeMessageHandleOption == "" { + c.LargeMessageHandleOption = LargeMessageHandleOptionNone + } + + if c.LargeMessageHandleCompression == "" { + c.LargeMessageHandleCompression = compression.None + } + + // compression can be enabled independently + if !compression.Supported(c.LargeMessageHandleCompression) { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle compression is not supported, got %s", c.LargeMessageHandleCompression) + } + if c.LargeMessageHandleOption == LargeMessageHandleOptionNone { + return nil + } + + switch protocol { + case ProtocolOpen, ProtocolSimple: + case ProtocolCanalJSON: + if !enableTiDBExtension { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to %s, protocol is %s, but enable-tidb-extension is false", + c.LargeMessageHandleOption, protocol.String()) + } + default: + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to %s, protocol is %s, it's not supported", + c.LargeMessageHandleOption, protocol.String()) + } + + if c.LargeMessageHandleOption == LargeMessageHandleOptionClaimCheck { + if c.ClaimCheckStorageURI == "" { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to claim-check, but the claim-check-storage-uri is empty") + } + if c.ClaimCheckRawValue && protocol == ProtocolOpen { + return cerror.ErrInvalidReplicaConfig.GenWithStack( + "large message handle is set to claim-check, raw value is not supported for the open protocol") + } + } + + return nil +} + +// HandleKeyOnly returns true if handle large message by encoding handle key only. +func (c *LargeMessageHandleConfig) HandleKeyOnly() bool { + if c == nil { + return false + } + return c.LargeMessageHandleOption == LargeMessageHandleOptionHandleKeyOnly +} + +// EnableClaimCheck returns true if enable claim check. +func (c *LargeMessageHandleConfig) EnableClaimCheck() bool { + if c == nil { + return false + } + return c.LargeMessageHandleOption == LargeMessageHandleOptionClaimCheck +} + +// Disabled returns true if disable large message handle. +func (c *LargeMessageHandleConfig) Disabled() bool { + if c == nil { + return false + } + return c.LargeMessageHandleOption == LargeMessageHandleOptionNone +} diff --git a/pkg/config/mounter.go b/pkg/config/mounter.go new file mode 100644 index 000000000..d3f3b069b --- /dev/null +++ b/pkg/config/mounter.go @@ -0,0 +1,19 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// MounterConfig represents mounter config for a changefeed +type MounterConfig struct { + WorkerNum int `toml:"worker-num" json:"worker-num"` +} diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 6895119ed..af590f7e6 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -52,10 +52,10 @@ var defaultReplicaConfig = &ReplicaConfig{ SyncPointInterval: util.AddressOf(10 * time.Minute), SyncPointRetention: util.AddressOf(24 * time.Hour), BDRMode: util.AddressOf(false), - Filter: &config.FilterConfig{ + Filter: &FilterConfig{ Rules: []string{"*.*"}, }, - Mounter: &config.MounterConfig{ + Mounter: &MounterConfig{ WorkerNum: 16, }, Sink: &SinkConfig{ @@ -83,7 +83,7 @@ var defaultReplicaConfig = &ReplicaConfig{ OpenProtocol: &OpenProtocolConfig{OutputOldValue: true}, Debezium: &DebeziumConfig{OutputOldValue: true}, }, - Consistent: &config.ConsistentConfig{ + Consistent: &ConsistentConfig{ Level: "none", MaxLogSize: redo.DefaultMaxLogSize, FlushIntervalInMs: redo.DefaultFlushIntervalInMs, @@ -93,11 +93,11 @@ var defaultReplicaConfig = &ReplicaConfig{ Storage: "", UseFileBackend: false, Compression: "", - MemoryUsage: &config.ConsistentMemoryUsage{ + MemoryUsage: &ConsistentMemoryUsage{ MemoryQuotaPercentage: 50, }, }, - Scheduler: &config.ChangefeedSchedulerConfig{ + Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, RegionThreshold: 100_000, WriteKeyThreshold: 0, @@ -107,7 +107,7 @@ var defaultReplicaConfig = &ReplicaConfig{ CorruptionHandleLevel: integrity.CorruptionHandleLevelWarn, }, ChangefeedErrorStuckDuration: util.AddressOf(time.Minute * 30), - SyncedStatus: &config.SyncedStatusConfig{SyncedCheckInterval: 5 * 60, CheckpointInterval: 15}, + SyncedStatus: &SyncedStatusConfig{SyncedCheckInterval: 5 * 60, CheckpointInterval: 15}, } // GetDefaultReplicaConfig returns the default replica config. @@ -149,18 +149,18 @@ type replicaConfig struct { // SyncPointInterval is only available when the downstream is DB. SyncPointInterval *time.Duration `toml:"sync-point-interval" json:"sync-point-interval,omitempty"` // SyncPointRetention is only available when the downstream is DB. - SyncPointRetention *time.Duration `toml:"sync-point-retention" json:"sync-point-retention,omitempty"` - Filter *config.FilterConfig `toml:"filter" json:"filter"` - Mounter *config.MounterConfig `toml:"mounter" json:"mounter"` - Sink *SinkConfig `toml:"sink" json:"sink"` + SyncPointRetention *time.Duration `toml:"sync-point-retention" json:"sync-point-retention,omitempty"` + Filter *FilterConfig `toml:"filter" json:"filter,omitempty"` + Mounter *MounterConfig `toml:"mounter" json:"mounter,omitempty"` + Sink *SinkConfig `toml:"sink" json:"sink,omitempty"` // Consistent is only available for DB downstream with redo feature enabled. - Consistent *config.ConsistentConfig `toml:"consistent" json:"consistent,omitempty"` + Consistent *ConsistentConfig `toml:"consistent" json:"consistent,omitempty"` // Scheduler is the configuration for scheduler. - Scheduler *config.ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler"` + Scheduler *ChangefeedSchedulerConfig `toml:"scheduler" json:"scheduler,omitempty"` // Integrity is only available when the downstream is MQ. - Integrity *integrity.Config `toml:"integrity" json:"integrity"` - ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` - SyncedStatus *config.SyncedStatusConfig `toml:"synced-status" json:"synced-status,omitempty"` + Integrity *integrity.Config `toml:"integrity" json:"integrity"` + ChangefeedErrorStuckDuration *time.Duration `toml:"changefeed-error-stuck-duration" json:"changefeed-error-stuck-duration,omitempty"` + SyncedStatus *SyncedStatusConfig `toml:"synced-status" json:"synced-status,omitempty"` // Deprecated: we don't use this field since v8.0.0. SQLMode string `toml:"sql-mode" json:"sql-mode"` @@ -333,11 +333,6 @@ func (c *ReplicaConfig) FixScheduler(inheritV66 bool) { c.Scheduler = defaultReplicaConfig.Clone().Scheduler return } - if inheritV66 && c.Scheduler.RegionPerSpan != 0 { - c.Scheduler.EnableTableAcrossNodes = true - c.Scheduler.RegionThreshold = c.Scheduler.RegionPerSpan - c.Scheduler.RegionPerSpan = 0 - } } // FixMemoryQuota adjusts memory quota to default value diff --git a/pkg/config/scheduler.go b/pkg/config/scheduler_config.go similarity index 98% rename from pkg/config/scheduler.go rename to pkg/config/scheduler_config.go index 036660663..cb9a279dc 100644 --- a/pkg/config/scheduler.go +++ b/pkg/config/scheduler_config.go @@ -29,8 +29,6 @@ type ChangefeedSchedulerConfig struct { RegionThreshold int `toml:"region-threshold" json:"region-threshold"` // WriteKeyThreshold is the written keys threshold of splitting a table. WriteKeyThreshold int `toml:"write-key-threshold" json:"write-key-threshold"` - // Deprecated. - RegionPerSpan int `toml:"region-per-span" json:"region-per-span"` } // Validate validates the config. diff --git a/pkg/config/server.go b/pkg/config/server.go index 129af371e..2d6fdc1cf 100644 --- a/pkg/config/server.go +++ b/pkg/config/server.go @@ -118,7 +118,6 @@ var defaultServerConfig = &ServerConfig{ Messages: defaultMessageConfig.Clone(), Scheduler: NewDefaultSchedulerConfig(), - CDCV2: &CDCV2{Enable: false}, Puller: NewDefaultPullerConfig(), }, ClusterID: "default", diff --git a/pkg/config/sink_config.go b/pkg/config/sink.go similarity index 85% rename from pkg/config/sink_config.go rename to pkg/config/sink.go index c84afacd7..16cf46f6d 100644 --- a/pkg/config/sink_config.go +++ b/pkg/config/sink.go @@ -1,3 +1,16 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package config import ( @@ -8,12 +21,10 @@ import ( "time" "github.com/apache/pulsar-client-go/pulsar" - "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/integrity" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" @@ -195,8 +206,8 @@ type SinkConfig struct { CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` // Integrity is only available when the downstream is MQ. - Integrity *integrity.Config `toml:"integrity" json:"integrity"` - ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` + Integrity *Config `toml:"integrity" json:"integrity"` + ForceReplicate bool `toml:"force-replicate" json:"force-replicate"` } // MaskSensitiveData masks sensitive data in SinkConfig @@ -221,7 +232,7 @@ func (s *SinkConfig) ShouldSendBootstrapMsg() bool { } protocol := util.GetOrZero(s.Protocol) - return protocol == config.ProtocolSimple.String() && + return protocol == ProtocolSimple.String() && util.GetOrZero(s.SendBootstrapIntervalInSec) > 0 && util.GetOrZero(s.SendBootstrapInMsgCount) > 0 } @@ -406,42 +417,42 @@ type CodecConfig struct { // KafkaConfig represents a kafka sink configuration type KafkaConfig struct { - PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` - ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` - KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` - MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` - Compression *string `toml:"compression" json:"compression,omitempty"` - KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` - AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` - DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` - WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` - ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` - RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` - SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` - SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` - SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` - SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` - SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` - SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` - SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` - SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` - SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` - SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` - SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` - SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` - SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` - SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` - SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` - SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` - SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` - EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` - CA *string `toml:"ca" json:"ca,omitempty"` - Cert *string `toml:"cert" json:"cert,omitempty"` - Key *string `toml:"key" json:"key,omitempty"` - InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` - CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` - LargeMessageHandle *config.LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` - GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` + PartitionNum *int32 `toml:"partition-num" json:"partition-num,omitempty"` + ReplicationFactor *int16 `toml:"replication-factor" json:"replication-factor,omitempty"` + KafkaVersion *string `toml:"kafka-version" json:"kafka-version,omitempty"` + MaxMessageBytes *int `toml:"max-message-bytes" json:"max-message-bytes,omitempty"` + Compression *string `toml:"compression" json:"compression,omitempty"` + KafkaClientID *string `toml:"kafka-client-id" json:"kafka-client-id,omitempty"` + AutoCreateTopic *bool `toml:"auto-create-topic" json:"auto-create-topic,omitempty"` + DialTimeout *string `toml:"dial-timeout" json:"dial-timeout,omitempty"` + WriteTimeout *string `toml:"write-timeout" json:"write-timeout,omitempty"` + ReadTimeout *string `toml:"read-timeout" json:"read-timeout,omitempty"` + RequiredAcks *int `toml:"required-acks" json:"required-acks,omitempty"` + SASLUser *string `toml:"sasl-user" json:"sasl-user,omitempty"` + SASLPassword *string `toml:"sasl-password" json:"sasl-password,omitempty"` + SASLMechanism *string `toml:"sasl-mechanism" json:"sasl-mechanism,omitempty"` + SASLGssAPIAuthType *string `toml:"sasl-gssapi-auth-type" json:"sasl-gssapi-auth-type,omitempty"` + SASLGssAPIKeytabPath *string `toml:"sasl-gssapi-keytab-path" json:"sasl-gssapi-keytab-path,omitempty"` + SASLGssAPIKerberosConfigPath *string `toml:"sasl-gssapi-kerberos-config-path" json:"sasl-gssapi-kerberos-config-path,omitempty"` + SASLGssAPIServiceName *string `toml:"sasl-gssapi-service-name" json:"sasl-gssapi-service-name,omitempty"` + SASLGssAPIUser *string `toml:"sasl-gssapi-user" json:"sasl-gssapi-user,omitempty"` + SASLGssAPIPassword *string `toml:"sasl-gssapi-password" json:"sasl-gssapi-password,omitempty"` + SASLGssAPIRealm *string `toml:"sasl-gssapi-realm" json:"sasl-gssapi-realm,omitempty"` + SASLGssAPIDisablePafxfast *bool `toml:"sasl-gssapi-disable-pafxfast" json:"sasl-gssapi-disable-pafxfast,omitempty"` + SASLOAuthClientID *string `toml:"sasl-oauth-client-id" json:"sasl-oauth-client-id,omitempty"` + SASLOAuthClientSecret *string `toml:"sasl-oauth-client-secret" json:"sasl-oauth-client-secret,omitempty"` + SASLOAuthTokenURL *string `toml:"sasl-oauth-token-url" json:"sasl-oauth-token-url,omitempty"` + SASLOAuthScopes []string `toml:"sasl-oauth-scopes" json:"sasl-oauth-scopes,omitempty"` + SASLOAuthGrantType *string `toml:"sasl-oauth-grant-type" json:"sasl-oauth-grant-type,omitempty"` + SASLOAuthAudience *string `toml:"sasl-oauth-audience" json:"sasl-oauth-audience,omitempty"` + EnableTLS *bool `toml:"enable-tls" json:"enable-tls,omitempty"` + CA *string `toml:"ca" json:"ca,omitempty"` + Cert *string `toml:"cert" json:"cert,omitempty"` + Key *string `toml:"key" json:"key,omitempty"` + InsecureSkipVerify *bool `toml:"insecure-skip-verify" json:"insecure-skip-verify,omitempty"` + CodecConfig *CodecConfig `toml:"codec-config" json:"codec-config,omitempty"` + LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"` + GlueSchemaRegistryConfig *GlueSchemaRegistryConfig `toml:"glue-schema-registry-config" json:"glue-schema-registry-config"` // OutputRawChangeEvent controls whether to split the update pk/uk events. OutputRawChangeEvent *bool `toml:"output-raw-change-event" json:"output-raw-change-event,omitempty"` @@ -705,7 +716,7 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { return nil } - protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) + protocol, _ := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) if s.KafkaConfig != nil && s.KafkaConfig.LargeMessageHandle != nil { var ( @@ -777,7 +788,7 @@ func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error { s.Terminator = util.AddressOf(CRLF) } - if util.GetOrZero(s.DeleteOnlyOutputHandleKeyColumns) && protocol == config.ProtocolCsv { + if util.GetOrZero(s.DeleteOnlyOutputHandleKeyColumns) && protocol == ProtocolCsv { return cerror.ErrSinkInvalidConfig.GenWithStack( "CSV protocol always output all columns for the delete event, " + "do not set `delete-only-output-handle-key-columns` to true") @@ -855,25 +866,25 @@ func (s *SinkConfig) validateAndAdjustSinkURI(sinkURI *url.URL) error { // ValidateProtocol validates the protocol configuration. func (s *SinkConfig) ValidateProtocol(scheme string) error { - protocol, err := config.ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) + protocol, err := ParseSinkProtocolFromString(util.GetOrZero(s.Protocol)) if err != nil { return err } outputOldValue := false switch protocol { - case config.ProtocolOpen: + case ProtocolOpen: if s.OpenProtocol != nil { outputOldValue = s.OpenProtocol.OutputOldValue } - case config.ProtocolDebezium: + case ProtocolDebezium: if s.Debezium != nil { outputOldValue = s.Debezium.OutputOldValue } - case config.ProtocolCsv: + case ProtocolCsv: if s.CSVConfig != nil { outputOldValue = s.CSVConfig.OutputOldValue } - case config.ProtocolAvro: + case ProtocolAvro: outputOldValue = false default: return nil @@ -906,32 +917,47 @@ func (s *SinkConfig) applyParameterBySinkURI(sinkURI *url.URL) error { return nil } + cfgInSinkURI := map[string]string{} + cfgInFile := map[string]string{} params := sinkURI.Query() - var errFromURI, errFromFile strings.Builder txnAtomicityFromURI := AtomicityLevel(params.Get(TxnAtomicityKey)) if txnAtomicityFromURI != unknownTxnAtomicity { if util.GetOrZero(s.TxnAtomicity) != unknownTxnAtomicity && util.GetOrZero(s.TxnAtomicity) != txnAtomicityFromURI { - errFromURI.WriteString(fmt.Sprintf("%s=%s, ", TxnAtomicityKey, txnAtomicityFromURI)) - errFromFile.WriteString(fmt.Sprintf("%s=%s, ", TxnAtomicityKey, util.GetOrZero(s.TxnAtomicity))) + cfgInSinkURI[TxnAtomicityKey] = string(txnAtomicityFromURI) + cfgInFile[TxnAtomicityKey] = string(util.GetOrZero(s.TxnAtomicity)) } s.TxnAtomicity = util.AddressOf(txnAtomicityFromURI) } - protocolFromURI := params.Get(config.ProtocolKey) + protocolFromURI := params.Get(ProtocolKey) if protocolFromURI != "" { if s.Protocol != nil && util.GetOrZero(s.Protocol) != protocolFromURI { - errFromURI.WriteString(fmt.Sprintf("%s=%s, ", config.ProtocolKey, protocolFromURI)) - errFromFile.WriteString(fmt.Sprintf("%s=%s, ", config.ProtocolKey, util.GetOrZero(s.Protocol))) + cfgInSinkURI[ProtocolKey] = protocolFromURI + cfgInFile[ProtocolKey] = util.GetOrZero(s.Protocol) } s.Protocol = util.AddressOf(protocolFromURI) } - if errFromURI.Len() == 0 && errFromFile.Len() == 0 { - return nil + getError := func() error { + if len(cfgInSinkURI) != len(cfgInFile) { + log.Panic("inconsistent configuration items in sink uri and configuration file", + zap.Any("cfgInSinkURI", cfgInSinkURI), zap.Any("cfgInFile", cfgInFile)) + } + if len(cfgInSinkURI) == 0 && len(cfgInFile) == 0 { + return nil + } + getErrMsg := func(cfgIn map[string]string) string { + var errMsg strings.Builder + for k, v := range cfgIn { + errMsg.WriteString(fmt.Sprintf("%s=%s, ", k, v)) + } + return errMsg.String()[0 : errMsg.Len()-2] + } + return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs( + getErrMsg(cfgInSinkURI), getErrMsg(cfgInFile)) } - return cerror.ErrIncompatibleSinkConfig.GenWithStackByArgs( - errFromURI.String()[0:errFromURI.Len()-2], errFromFile.String()[0:errFromFile.Len()-2]) + return getError() } // CheckCompatibilityWithSinkURI check whether the sinkURI is compatible with the sink config. diff --git a/pkg/config/sink_protocol.go b/pkg/config/sink_protocol.go new file mode 100644 index 000000000..63a0941ce --- /dev/null +++ b/pkg/config/sink_protocol.go @@ -0,0 +1,106 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "strings" + + cerror "github.com/pingcap/tiflow/pkg/errors" +) + +const ( + // ProtocolKey specifies the key of the protocol in the SinkURI. + ProtocolKey = "protocol" +) + +// Protocol is the protocol of the message. +type Protocol int + +// Enum types of the Protocol. +const ( + ProtocolUnknown Protocol = iota + ProtocolDefault + ProtocolCanal + ProtocolAvro + ProtocolMaxwell + ProtocolCanalJSON + ProtocolCraft + ProtocolOpen + ProtocolCsv + ProtocolDebezium + ProtocolSimple +) + +// IsBatchEncode returns whether the protocol is a batch encoder. +func (p Protocol) IsBatchEncode() bool { + return p == ProtocolOpen || p == ProtocolCanal || p == ProtocolMaxwell || p == ProtocolCraft +} + +// ParseSinkProtocolFromString converts the protocol from string to Protocol enum type. +func ParseSinkProtocolFromString(protocol string) (Protocol, error) { + switch strings.ToLower(protocol) { + case "default": + return ProtocolOpen, nil + case "canal": + return ProtocolCanal, nil + case "avro": + return ProtocolAvro, nil + case "flat-avro": + return ProtocolAvro, nil + case "maxwell": + return ProtocolMaxwell, nil + case "canal-json": + return ProtocolCanalJSON, nil + case "craft": + return ProtocolCraft, nil + case "open-protocol": + return ProtocolOpen, nil + case "csv": + return ProtocolCsv, nil + case "debezium": + return ProtocolDebezium, nil + case "simple": + return ProtocolSimple, nil + default: + return ProtocolUnknown, cerror.ErrSinkUnknownProtocol.GenWithStackByArgs(protocol) + } +} + +// String converts the Protocol enum type string to string. +func (p Protocol) String() string { + switch p { + case ProtocolDefault: + return "default" + case ProtocolCanal: + return "canal" + case ProtocolAvro: + return "avro" + case ProtocolMaxwell: + return "maxwell" + case ProtocolCanalJSON: + return "canal-json" + case ProtocolCraft: + return "craft" + case ProtocolOpen: + return "open-protocol" + case ProtocolCsv: + return "csv" + case ProtocolDebezium: + return "debezium" + case ProtocolSimple: + return "simple" + default: + panic("unreachable") + } +} diff --git a/pkg/config/synced_status_config.go b/pkg/config/synced_status_config.go new file mode 100644 index 000000000..fb657f0f9 --- /dev/null +++ b/pkg/config/synced_status_config.go @@ -0,0 +1,23 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// SyncedStatusConfig represents synced check interval config for a changefeed +type SyncedStatusConfig struct { + // The minimum interval between the latest synced ts and now required to reach synced state + SyncedCheckInterval int64 `toml:"synced-check-interval" json:"synced-check-interval"` + // The maximum interval between latest checkpoint ts and now or + // between latest sink's checkpoint ts and puller's checkpoint ts required to reach synced state + CheckpointInterval int64 `toml:"checkpoint-interval" json:"checkpoint-interval"` +} diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index b42d12659..dbd8c5b80 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -10,9 +10,9 @@ import ( "github.com/flowbehappy/tigate/logservice/schemastore" "github.com/flowbehappy/tigate/pkg/common" appcontext "github.com/flowbehappy/tigate/pkg/common/context" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/messaging" "github.com/pingcap/log" - "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) diff --git a/pkg/filter/expr_filter.go b/pkg/filter/expr_filter.go index 5ff6a58b3..f5c257d48 100644 --- a/pkg/filter/expr_filter.go +++ b/pkg/filter/expr_filter.go @@ -17,6 +17,7 @@ import ( "strings" "sync" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/expression" @@ -28,7 +29,6 @@ import ( tfilter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/dm/pkg/utils" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) diff --git a/pkg/filter/filter.go b/pkg/filter/filter.go index 1b649d53e..10dab2f67 100644 --- a/pkg/filter/filter.go +++ b/pkg/filter/filter.go @@ -18,11 +18,11 @@ import ( "github.com/flowbehappy/tigate/pkg/apperror" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/log" timodel "github.com/pingcap/tidb/pkg/meta/model" tfilter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) diff --git a/pkg/filter/sql_event_filter.go b/pkg/filter/sql_event_filter.go index dd3b9c8a4..6de4039ba 100644 --- a/pkg/filter/sql_event_filter.go +++ b/pkg/filter/sql_event_filter.go @@ -14,12 +14,12 @@ package filter import ( + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/errors" "github.com/pingcap/log" tfilter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/pingcap/tiflow/cdc/model" bf "github.com/pingcap/tiflow/pkg/binlog-filter" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "go.uber.org/zap" ) diff --git a/pkg/filter/utils.go b/pkg/filter/utils.go index 908ccb60f..cee198f77 100644 --- a/pkg/filter/utils.go +++ b/pkg/filter/utils.go @@ -16,11 +16,11 @@ package filter import ( "fmt" + "github.com/flowbehappy/tigate/pkg/config" timodel "github.com/pingcap/tidb/pkg/meta/model" tifilter "github.com/pingcap/tidb/pkg/util/filter" tfilter "github.com/pingcap/tidb/pkg/util/table-filter" bf "github.com/pingcap/tiflow/pkg/binlog-filter" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) diff --git a/pkg/messaging/message.go b/pkg/messaging/message.go index 70d753fa5..be2cd9f66 100644 --- a/pkg/messaging/message.go +++ b/pkg/messaging/message.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/node" "github.com/flowbehappy/tigate/eventpb" @@ -13,7 +14,6 @@ import ( commonEvent "github.com/flowbehappy/tigate/pkg/common/event" "github.com/pingcap/log" bf "github.com/pingcap/tiflow/pkg/binlog-filter" - "github.com/pingcap/tiflow/pkg/config" "go.uber.org/zap" ) diff --git a/pkg/metrics/statistics.go b/pkg/metrics/statistics.go index 7a36eaf11..ee69134ff 100644 --- a/pkg/metrics/statistics.go +++ b/pkg/metrics/statistics.go @@ -16,8 +16,8 @@ package metrics import ( "time" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/tiflow/cdc/model" - cdcconfig "github.com/pingcap/tiflow/pkg/config" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +30,7 @@ func NewStatistics( ) *Statistics { statistics := &Statistics{ sinkType: sinkType, - captureAddr: cdcconfig.GetGlobalServerConfig().AdvertiseAddr, + captureAddr: config.GetGlobalServerConfig().AdvertiseAddr, changefeedID: changefeed, } diff --git a/pkg/node/coordinator.go b/pkg/node/coordinator.go index 888c97acf..920473820 100644 --- a/pkg/node/coordinator.go +++ b/pkg/node/coordinator.go @@ -16,6 +16,7 @@ package node import ( "context" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/tiflow/cdc/model" ) @@ -30,11 +31,11 @@ type Coordinator interface { // Run handles messages Run(ctx context.Context) error // ListChangefeeds returns all changefeeds - ListChangefeeds(ctx context.Context) ([]*model.ChangeFeedInfo, []*model.ChangeFeedStatus, error) + ListChangefeeds(ctx context.Context) ([]*config.ChangeFeedInfo, []*model.ChangeFeedStatus, error) // GetChangefeed returns a changefeed - GetChangefeed(ctx context.Context, id model.ChangeFeedID) (*model.ChangeFeedInfo, *model.ChangeFeedStatus, error) + GetChangefeed(ctx context.Context, id model.ChangeFeedID) (*config.ChangeFeedInfo, *model.ChangeFeedStatus, error) // CreateChangefeed creates a new changefeed - CreateChangefeed(ctx context.Context, info *model.ChangeFeedInfo) error + CreateChangefeed(ctx context.Context, info *config.ChangeFeedInfo) error // RemoveChangefeed gets a changefeed RemoveChangefeed(ctx context.Context, id model.ChangeFeedID) (uint64, error) // PauseChangefeed pauses a changefeed @@ -42,5 +43,5 @@ type Coordinator interface { // ResumeChangefeed resumes a changefeed ResumeChangefeed(ctx context.Context, id model.ChangeFeedID, newCheckpointTs uint64) error // UpdateChangefeed updates a changefeed - UpdateChangefeed(ctx context.Context, change *model.ChangeFeedInfo) error + UpdateChangefeed(ctx context.Context, change *config.ChangeFeedInfo) error } diff --git a/pkg/sink/codec/avro/arvo.go b/pkg/sink/codec/avro/arvo.go index fa646732a..188e370d2 100644 --- a/pkg/sink/codec/avro/arvo.go +++ b/pkg/sink/codec/avro/arvo.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/rowcodec" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/tikv/client-go/v2/oracle" diff --git a/pkg/sink/codec/avro/glue_schema_registry.go b/pkg/sink/codec/avro/glue_schema_registry.go index 97930ca30..6e398ddb9 100644 --- a/pkg/sink/codec/avro/glue_schema_registry.go +++ b/pkg/sink/codec/avro/glue_schema_registry.go @@ -25,11 +25,11 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/glue" "github.com/aws/aws-sdk-go-v2/service/glue/types" + "github.com/flowbehappy/tigate/pkg/config" "github.com/google/uuid" "github.com/linkedin/goavro/v2" "github.com/pingcap/errors" "github.com/pingcap/log" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink/codec/common" "go.uber.org/zap" diff --git a/pkg/sink/codec/canal/encoder.go b/pkg/sink/codec/canal/encoder.go index 7661da00f..fd064286e 100644 --- a/pkg/sink/codec/canal/encoder.go +++ b/pkg/sink/codec/canal/encoder.go @@ -22,6 +22,7 @@ import ( newcommon "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/flowbehappy/tigate/pkg/sink/codec/internal" + "github.com/flowbehappy/tigate/pkg/sink/kafka/claimcheck" "github.com/goccy/go-json" "github.com/mailru/easyjson/jwriter" "github.com/pingcap/errors" @@ -31,7 +32,6 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/sink/codec/utils" - "github.com/pingcap/tiflow/pkg/sink/kafka/claimcheck" "go.uber.org/zap" "golang.org/x/text/encoding" "golang.org/x/text/encoding/charmap" @@ -386,9 +386,6 @@ func NewJSONRowEventEncoder(ctx context.Context, config *newcommon.Config) (enco if err != nil { return nil, errors.Trace(err) } - if err != nil { - return nil, errors.Trace(err) - } return &JSONRowEventEncoder{ messages: make([]*ticommon.Message, 0, 1), bytesDecoder: charmap.ISO8859_1.NewDecoder(), diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 25c76ff4b..2927cac0c 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -18,13 +18,12 @@ import ( "net/url" "time" - ticonfig "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/config" "github.com/gin-gonic/gin/binding" "github.com/imdario/mergo" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" @@ -55,7 +54,7 @@ type Config struct { AvroConfluentSchemaRegistry string AvroDecimalHandlingMode string AvroBigintUnsignedHandlingMode string - AvroGlueSchemaRegistry *ticonfig.GlueSchemaRegistryConfig + AvroGlueSchemaRegistry *config.GlueSchemaRegistryConfig // EnableWatermarkEvent set to true, avro encode DDL and checkpoint event // and send to the downstream kafka, they cannot be consumed by the confluent official consumer // and would cause error, so this is only used for ticdc internal testing purpose, should not be @@ -175,7 +174,7 @@ type urlConfig struct { } // Apply fill the Config -func (c *Config) Apply(sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) error { +func (c *Config) Apply(sinkURI *url.URL, sinkConfig *config.SinkConfig) error { req := &http.Request{URL: sinkURI} var err error urlParameter := &urlConfig{} @@ -238,11 +237,6 @@ func (c *Config) Apply(sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) error if sinkConfig.KafkaConfig != nil && sinkConfig.KafkaConfig.LargeMessageHandle != nil { c.LargeMessageHandle = sinkConfig.KafkaConfig.LargeMessageHandle } - if !c.LargeMessageHandle.Disabled() && sinkConfig.ForceReplicate { - return cerror.ErrCodecInvalidConfig.GenWithStack( - `force-replicate must be disabled, when the large message handle is enabled, large message handle: "%s"`, - c.LargeMessageHandle.LargeMessageHandleOption) - } if sinkConfig.OpenProtocol != nil { c.OpenOutputOldValue = sinkConfig.OpenProtocol.OutputOldValue } @@ -292,7 +286,7 @@ func (c *Config) Apply(sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig) error } func mergeConfig( - sinkConfig *ticonfig.SinkConfig, + sinkConfig *config.SinkConfig, urlParameters *urlConfig, ) (*urlConfig, error) { dest := &urlConfig{} diff --git a/pkg/sink/codec/common/message.go b/pkg/sink/codec/common/message.go index 6ee0d52b8..dcd1c43d5 100644 --- a/pkg/sink/codec/common/message.go +++ b/pkg/sink/codec/common/message.go @@ -18,8 +18,8 @@ import ( "encoding/json" "time" + "github.com/flowbehappy/tigate/pkg/config" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/tikv/client-go/v2/oracle" ) diff --git a/pkg/sink/codec/encoder_builder.go b/pkg/sink/codec/encoder_builder.go index fe1d10c42..ada590976 100644 --- a/pkg/sink/codec/encoder_builder.go +++ b/pkg/sink/codec/encoder_builder.go @@ -3,11 +3,11 @@ package codec import ( "context" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/sink/codec/canal" "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/flowbehappy/tigate/pkg/sink/codec/open" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ) diff --git a/pkg/sink/codec/encoder_group.go b/pkg/sink/codec/encoder_group.go index 8d3aa5228..8801a92de 100644 --- a/pkg/sink/codec/encoder_group.go +++ b/pkg/sink/codec/encoder_group.go @@ -20,13 +20,12 @@ import ( "time" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" - ticonfig "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/config" newCommon "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" "github.com/pingcap/tiflow/pkg/util" "go.uber.org/zap" @@ -68,7 +67,7 @@ type encoderGroup struct { // NewEncoderGroup creates a new EncoderGroup instance func NewEncoderGroup( ctx context.Context, - cfg *ticonfig.SinkConfig, + cfg *config.SinkConfig, encoderConfig *newCommon.Config, changefeedID model.ChangeFeedID, ) *encoderGroup { diff --git a/pkg/sink/codec/open/encoder.go b/pkg/sink/codec/open/encoder.go index bde88b800..df7562e96 100644 --- a/pkg/sink/codec/open/encoder.go +++ b/pkg/sink/codec/open/encoder.go @@ -7,13 +7,13 @@ import ( commonEvent "github.com/flowbehappy/tigate/pkg/common/event" newcommon "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/flowbehappy/tigate/pkg/sink/codec/encoder" + "github.com/flowbehappy/tigate/pkg/sink/kafka/claimcheck" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" ticommon "github.com/pingcap/tiflow/pkg/sink/codec/common" - "github.com/pingcap/tiflow/pkg/sink/kafka/claimcheck" "go.uber.org/zap" ) diff --git a/pkg/sink/kafka/claimcheck/claim_check.go b/pkg/sink/kafka/claimcheck/claim_check.go new file mode 100644 index 000000000..e3f8a5c59 --- /dev/null +++ b/pkg/sink/kafka/claimcheck/claim_check.go @@ -0,0 +1,127 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package claimcheck + +import ( + "context" + "encoding/json" + "strings" + "time" + + "github.com/flowbehappy/tigate/pkg/config" + "github.com/google/uuid" + "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/sink/codec/common" + "github.com/pingcap/tiflow/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +const ( + defaultTimeout = 5 * time.Minute +) + +// ClaimCheck manage send message to the claim-check external storage. +type ClaimCheck struct { + storage storage.ExternalStorage + rawValue bool + + changefeedID model.ChangeFeedID + // metricSendMessageDuration tracks the time duration + // cost on send messages to the claim check external storage. + metricSendMessageDuration prometheus.Observer + metricSendMessageCount prometheus.Counter +} + +// New return a new ClaimCheck. +func New(ctx context.Context, config *config.LargeMessageHandleConfig, changefeedID model.ChangeFeedID) (*ClaimCheck, error) { + if !config.EnableClaimCheck() { + return nil, nil + } + + log.Info("claim check enabled, start create the external storage", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI))) + + start := time.Now() + externalStorage, err := util.GetExternalStorageWithTimeout(ctx, config.ClaimCheckStorageURI, defaultTimeout) + if err != nil { + log.Error("create external storage failed", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI)), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return nil, errors.Trace(err) + } + + log.Info("claim-check create the external storage success", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("storageURI", util.MaskSensitiveDataInURI(config.ClaimCheckStorageURI)), + zap.Duration("duration", time.Since(start))) + + return &ClaimCheck{ + changefeedID: changefeedID, + storage: externalStorage, + rawValue: config.ClaimCheckRawValue, + metricSendMessageDuration: claimCheckSendMessageDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), + metricSendMessageCount: claimCheckSendMessageCount.WithLabelValues(changefeedID.Namespace, changefeedID.ID), + }, nil +} + +// WriteMessage write message to the claim check external storage. +func (c *ClaimCheck) WriteMessage(ctx context.Context, key, value []byte, fileName string) (err error) { + if !c.rawValue { + m := common.ClaimCheckMessage{ + Key: key, + Value: value, + } + value, err = json.Marshal(m) + if err != nil { + return errors.Trace(err) + } + } + start := time.Now() + err = c.storage.WriteFile(ctx, fileName, value) + if err != nil { + return errors.Trace(err) + } + c.metricSendMessageDuration.Observe(time.Since(start).Seconds()) + c.metricSendMessageCount.Inc() + return nil +} + +// FileNameWithPrefix returns the file name with prefix, the full path. +func (c *ClaimCheck) FileNameWithPrefix(fileName string) string { + return strings.TrimSuffix(c.storage.URI(), "/") + "/" + fileName +} + +// CleanMetrics the claim check by clean up the metrics. +func (c *ClaimCheck) CleanMetrics() { + claimCheckSendMessageDuration.DeleteLabelValues(c.changefeedID.Namespace, c.changefeedID.ID) + claimCheckSendMessageCount.DeleteLabelValues(c.changefeedID.Namespace, c.changefeedID.ID) +} + +// NewFileName return the file name for the message which is delivered to the external storage system. +// UUID V4 is used to generate random and unique file names. +// This should not exceed the S3 object name length limit. +// ref https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html +func NewFileName() string { + return uuid.NewString() + ".json" +} diff --git a/pkg/sink/kafka/claimcheck/metrics.go b/pkg/sink/kafka/claimcheck/metrics.go new file mode 100644 index 000000000..8e3f614fd --- /dev/null +++ b/pkg/sink/kafka/claimcheck/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package claimcheck + +import "github.com/prometheus/client_golang/prometheus" + +var ( + // claimCheckSendMessageDuration records the duration of send message to the external claim-check storage. + claimCheckSendMessageDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "mq_claim_check_send_message_duration", + Help: "Duration(s) for MQ worker send message to the external claim-check storage.", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s + }, []string{"namespace", "changefeed"}) + + // claimCheckSendMessageCount records the total count of messages sent to the external claim-check storage. + claimCheckSendMessageCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sink", + Name: "mq_claim_check_send_message_count", + Help: "The total count of messages sent to the external claim-check storage.", + }, []string{"namespace", "changefeed"}) +) + +// InitMetrics registers all claim check related metrics +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(claimCheckSendMessageDuration) + registry.MustRegister(claimCheckSendMessageCount) +} diff --git a/pkg/sink/kafka/options.go b/pkg/sink/kafka/options.go index 1f15cb5d5..c6729d53f 100644 --- a/pkg/sink/kafka/options.go +++ b/pkg/sink/kafka/options.go @@ -11,14 +11,12 @@ import ( "strings" "time" - "github.com/pingcap/errors" - - ticonfig "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/config" "github.com/gin-gonic/gin/binding" "github.com/imdario/mergo" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/security" tikafka "github.com/pingcap/tiflow/pkg/sink/kafka" @@ -212,7 +210,7 @@ func (o *Options) SetPartitionNum(realPartitionCount int32) error { // Apply the sinkURI to update Options func (o *Options) Apply(changefeedID model.ChangeFeedID, - sinkURI *url.URL, sinkConfig *ticonfig.SinkConfig, + sinkURI *url.URL, sinkConfig *config.SinkConfig, ) error { o.BrokerEndpoints = strings.Split(sinkURI.Host, ",") @@ -312,7 +310,7 @@ func (o *Options) Apply(changefeedID model.ChangeFeedID, } func mergeConfig( - sinkConfig *ticonfig.SinkConfig, + sinkConfig *config.SinkConfig, urlParameters *urlConfig, ) (*urlConfig, error) { dest := &urlConfig{} @@ -399,7 +397,7 @@ func (o *Options) applyTLS(params *urlConfig) error { return nil } -func (o *Options) applySASL(urlParameter *urlConfig, sinkConfig *ticonfig.SinkConfig) error { +func (o *Options) applySASL(urlParameter *urlConfig, sinkConfig *config.SinkConfig) error { if urlParameter.SASLUser != nil && *urlParameter.SASLUser != "" { o.SASL.SASLUser = *urlParameter.SASLUser } diff --git a/pkg/sink/util/helper.go b/pkg/sink/util/helper.go index f44aec762..8526a1d23 100644 --- a/pkg/sink/util/helper.go +++ b/pkg/sink/util/helper.go @@ -6,11 +6,11 @@ import ( "github.com/flowbehappy/tigate/heartbeatpb" commonEvent "github.com/flowbehappy/tigate/pkg/common/event" + "github.com/flowbehappy/tigate/pkg/config" ticonfig "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/sink/codec/common" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" ) diff --git a/server/server_prepare.go b/server/server_prepare.go index 5781d2131..5f9029964 100644 --- a/server/server_prepare.go +++ b/server/server_prepare.go @@ -16,19 +16,19 @@ package server import ( "context" "fmt" - "github.com/flowbehappy/tigate/pkg/node" "os" "path/filepath" "strings" "time" "github.com/dustin/go-humanize" + "github.com/flowbehappy/tigate/pkg/config" + "github.com/flowbehappy/tigate/pkg/node" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/util/gctuner" "github.com/pingcap/tiflow/cdc/kv" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/fsutil" diff --git a/server/watcher/module_node_manager.go b/server/watcher/module_node_manager.go index 8a2e17c8f..190ef9a04 100644 --- a/server/watcher/module_node_manager.go +++ b/server/watcher/module_node_manager.go @@ -19,10 +19,10 @@ import ( "sync/atomic" "time" + "github.com/flowbehappy/tigate/pkg/config" "github.com/flowbehappy/tigate/pkg/node" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/orchestrator" "go.etcd.io/etcd/client/v3/concurrency"