From 658ed6aae18e31fdb26f0260187f8fd00aab98b5 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 29 Jul 2022 17:36:02 +0800 Subject: [PATCH 1/5] reparo support pprof (#1182) --- reparo/config.go | 2 + reparo/decode.go | 1 + reparo/http_handler.go | 69 ++++++++++++++++++++++++++++++++++ reparo/metrics.go | 73 ++++++++++++++++++++++++++++++++++++ reparo/reparo.go | 20 +++++++++- reparo/syncer/mysql.go | 12 ++++-- reparo/syncer/mysql_test.go | 4 +- reparo/syncer/syncer.go | 5 ++- reparo/syncer/syncer_test.go | 2 +- 9 files changed, 177 insertions(+), 11 deletions(-) create mode 100644 reparo/http_handler.go create mode 100644 reparo/metrics.go diff --git a/reparo/config.go b/reparo/config.go index 0a0db749f..194c2bbb9 100644 --- a/reparo/config.go +++ b/reparo/config.go @@ -46,6 +46,7 @@ type Config struct { StopTSO int64 `toml:"stop-tso" json:"stop-tso"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` WorkerCount int `toml:"worker-count" json:"worker-count"` + StatusAddr string `toml:"status-addr" json:"status-addr"` DestType string `toml:"dest-type" json:"dest-type"` DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"` @@ -77,6 +78,7 @@ func NewConfig() *Config { fs.StringVar(&c.Dir, "data-dir", "", "drainer data directory path") fs.StringVar(&c.StartDatetime, "start-datetime", "", "recovery from start-datetime, empty string means starting from the beginning of the first file") fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.") + fs.StringVar(&c.StatusAddr, "status-addr", ":8381", "reparo API server and pprof addr") fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format") fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format") fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch") diff --git a/reparo/decode.go b/reparo/decode.go index 9763bd4d6..3fcb2b558 100644 --- a/reparo/decode.go +++ b/reparo/decode.go @@ -28,6 +28,7 @@ func Decode(r io.Reader) (*pb.Binlog, int64, error) { if err != nil { return nil, 0, errors.Trace(err) } + readBinlogSizeHistogram.Observe(float64(length)) binlog := &pb.Binlog{} err = binlog.Unmarshal(payload) diff --git a/reparo/http_handler.go b/reparo/http_handler.go new file mode 100644 index 000000000..9c435ea0f --- /dev/null +++ b/reparo/http_handler.go @@ -0,0 +1,69 @@ +// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. + +package reparo + +import ( + "net" + "net/http" + "net/http/pprof" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/soheilhy/cmux" + "go.uber.org/zap" +) + +var cmuxReadTimeout = 10 * time.Second + +func startHTTPServer(lis net.Listener) { + router := http.NewServeMux() + router.Handle("/metrics", promhttp.Handler()) + + router.HandleFunc("/debug/pprof/", pprof.Index) + router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + router.HandleFunc("/debug/pprof/profile", pprof.Profile) + router.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + router.HandleFunc("/debug/pprof/trace", pprof.Trace) + + httpServer := &http.Server{ + Handler: router, + } + err := httpServer.Serve(lis) + err = errors.Cause(err) + if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed { + log.Info("reparo http handler return with error", zap.String("error", err.Error())) + } +} + +func startReparoService(addr string) error { + rootLis, err := net.Listen("tcp", addr) + if err != nil { + return errors.Annotate(err, "start listening") + } + + // create a cmux + m := cmux.New(rootLis) + m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352 + + httpL := m.Match(cmux.HTTP1Fast()) + go startHTTPServer(httpL) + + err = m.Serve() // start serving, block + if err != nil && isErrNetClosing(err) { + err = nil + } + return err +} + +var useOfClosedErrMsg = "use of closed network connection" + +// isErrNetClosing checks whether is an ErrNetClosing error +func isErrNetClosing(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), useOfClosedErrMsg) +} diff --git a/reparo/metrics.go b/reparo/metrics.go new file mode 100644 index 000000000..08abbbe62 --- /dev/null +++ b/reparo/metrics.go @@ -0,0 +1,73 @@ +package reparo + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + eventCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "event", + Help: "the count of sql event(dml, ddl).", + }, []string{"type"}) + + checkpointTSOGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "checkpoint_tso", + Help: "save checkpoint tso of drainer.", + }) + executeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "execute_duration_time", + Help: "Bucketed histogram of processing time (s) of a execute to sync data to downstream.", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18), + }) + + queryHistogramVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "query_duration_time", + Help: "Bucketed histogram of processing time (s) of a query to sync data to downstream.", + Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18), + }, []string{"type"}) + + readBinlogSizeHistogram = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "read_binlog_size", + Help: "Bucketed histogram of size of a binlog.", + Buckets: prometheus.ExponentialBuckets(16, 2, 25), + }) + + queueSizeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "binlog", + Subsystem: "drainer", + Name: "queue_size", + Help: "the size of queue", + }, []string{"name"}) +) + +func init() { + registry := prometheus.DefaultRegisterer + registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) + registry.MustRegister(prometheus.NewGoCollector()) + registry.MustRegister(checkpointTSOGauge) + registry.MustRegister(eventCounter) + registry.MustRegister(executeHistogram) + registry.MustRegister(readBinlogSizeHistogram) + registry.MustRegister(queryHistogramVec) + registry.MustRegister(queueSizeGauge) + + if gatherer, ok := registry.(prometheus.Gatherer); ok { + prometheus.DefaultGatherer = gatherer + } +} diff --git a/reparo/reparo.go b/reparo/reparo.go index 8fa4c07d5..a1d3d4c14 100644 --- a/reparo/reparo.go +++ b/reparo/reparo.go @@ -15,10 +15,12 @@ package reparo import ( "io" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb-binlog/pkg/filter" + "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" "github.com/pingcap/tidb-binlog/reparo/syncer" "github.com/tikv/client-go/v2/oracle" @@ -37,7 +39,11 @@ type Reparo struct { func New(cfg *Config) (*Reparo, error) { log.Info("New Reparo", zap.Stringer("config", cfg)) - syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode) + syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode, &loader.MetricsGroup{ + EventCounterVec: eventCounter, + QueryHistogramVec: queryHistogramVec, + QueueSizeGauge: queueSizeGauge, + }) if err != nil { return nil, errors.Trace(err) } @@ -53,6 +59,14 @@ func New(cfg *Config) (*Reparo, error) { // Process runs the main procedure. func (r *Reparo) Process() error { + if r.cfg.StatusAddr != "" { + go func() { + err := startReparoService(r.cfg.StatusAddr) + if err != nil { + log.Info("meet error when stopping reparo http service", zap.String("error", err.Error())) + } + }() + } pbReader, err := newDirPbReader(r.cfg.Dir, r.cfg.StartTSO, r.cfg.StopTSO) if err != nil { return errors.Annotatef(err, "new reader failed dir: %s", r.cfg.Dir) @@ -77,10 +91,12 @@ func (r *Reparo) Process() error { if ignore { continue } - + beginTime := time.Now() err = r.syncer.Sync(binlog, func(binlog *pb.Binlog) { dt := oracle.GetTimeFromTS(uint64(binlog.CommitTs)) log.Info("sync binlog success", zap.Int64("ts", binlog.CommitTs), zap.Time("datetime", dt)) + checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(binlog.CommitTs)))) + executeHistogram.Observe(time.Since(beginTime).Seconds()) }) if err != nil { diff --git a/reparo/syncer/mysql.go b/reparo/syncer/mysql.go index f318eef96..c2c04866a 100644 --- a/reparo/syncer/mysql.go +++ b/reparo/syncer/mysql.go @@ -49,17 +49,21 @@ var ( // should be only used for unit test to create mock db var createDB = loader.CreateDB -func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) { +func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) { db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, nil) if err != nil { return nil, errors.Trace(err) } - return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode) + return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode, metricsGroup) } -func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) { - loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize)) +func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) { + opts := []loader.Option{loader.WorkerCount(worker), loader.BatchSize(batchSize)} + if metricsGroup != nil { + opts = append(opts, loader.Metrics(metricsGroup)) + } + loader, err := loader.NewLoader(db, opts...) if err != nil { return nil, errors.Annotate(err, "new loader failed") } diff --git a/reparo/syncer/mysql_test.go b/reparo/syncer/mysql_test.go index 3c93d87ea..a49641a0b 100644 --- a/reparo/syncer/mysql_test.go +++ b/reparo/syncer/mysql_test.go @@ -5,7 +5,7 @@ import ( "database/sql" "time" - sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/DATA-DOG/go-sqlmock" "github.com/pingcap/check" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -33,7 +33,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) { createDB = oldCreateDB }() - syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode) + syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode, nil) c.Assert(err, check.IsNil) mock.ExpectBegin() diff --git a/reparo/syncer/syncer.go b/reparo/syncer/syncer.go index 8e1a1810d..e679c1ce8 100644 --- a/reparo/syncer/syncer.go +++ b/reparo/syncer/syncer.go @@ -16,6 +16,7 @@ package syncer import ( "fmt" + "github.com/pingcap/tidb-binlog/pkg/loader" pb "github.com/pingcap/tidb-binlog/proto/binlog" ) @@ -29,10 +30,10 @@ type Syncer interface { } // New creates a new executor based on the name. -func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) { +func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (Syncer, error) { switch name { case "mysql": - return newMysqlSyncer(cfg, worker, batchSize, safemode) + return newMysqlSyncer(cfg, worker, batchSize, safemode, metricsGroup) case "print": return newPrintSyncer() case "memory": diff --git a/reparo/syncer/syncer_test.go b/reparo/syncer/syncer_test.go index 4ebd8c831..395a30c1b 100644 --- a/reparo/syncer/syncer_test.go +++ b/reparo/syncer/syncer_test.go @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) { } for _, testCase := range testCases { - syncer, err := New(testCase.typeStr, cfg, 16, 20, false) + syncer, err := New(testCase.typeStr, cfg, 16, 20, false, nil) c.Assert(err, check.IsNil) c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp) } From 80b791104fd015ccc8ebeecb6be357d5daa1adc3 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 29 Jul 2022 18:21:27 +0800 Subject: [PATCH 2/5] fix prometheus.registry (#1183) * reparo support pprof * fix --- reparo/metrics.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/reparo/metrics.go b/reparo/metrics.go index 08abbbe62..c169f4c9e 100644 --- a/reparo/metrics.go +++ b/reparo/metrics.go @@ -57,7 +57,7 @@ var ( ) func init() { - registry := prometheus.DefaultRegisterer + registry := prometheus.NewRegistry() registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) registry.MustRegister(prometheus.NewGoCollector()) registry.MustRegister(checkpointTSOGauge) @@ -67,7 +67,5 @@ func init() { registry.MustRegister(queryHistogramVec) registry.MustRegister(queueSizeGauge) - if gatherer, ok := registry.(prometheus.Gatherer); ok { - prometheus.DefaultGatherer = gatherer - } + prometheus.DefaultGatherer = registry } From a1f4d395121f48f9b33365c16f4ee2a329ff8e40 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 1 Aug 2022 10:24:56 +0800 Subject: [PATCH 3/5] update go.mod --- go.mod | 2 ++ go.sum | 2 ++ 2 files changed, 4 insertions(+) diff --git a/go.mod b/go.mod index 6f7b6f248..11e2b4526 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 + github.com/pingcap/parser v3.1.2+incompatible // indirect github.com/pingcap/tidb v1.1.0-beta.0.20220114083142-ed1cd2a2f4d3 github.com/pingcap/tidb-tools v5.2.3-0.20211101071251-40e8f0cfcb1d+incompatible github.com/pingcap/tidb/parser v0.0.0-20220114083142-ed1cd2a2f4d3 @@ -39,6 +40,7 @@ require ( github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.2.1 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee diff --git a/go.sum b/go.sum index 9fe68ef06..39204daa3 100644 --- a/go.sum +++ b/go.sum @@ -703,6 +703,8 @@ github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 h1:SvWCbCPh1YeHd9yQLksvJYAgft6wLTY1aNG81tpyscQ= github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= +github.com/pingcap/parser v3.1.2+incompatible h1:ZAtv2VBZitECpaHshSIp1bkBhEqJYerw7nO/HYsn8MM= +github.com/pingcap/parser v3.1.2+incompatible/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY= github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d h1:k3/APKZjXOyJrFy8VyYwRlZhMelpD3qBLJNsw3bPl/g= From 11a6ee1f8e520bc01e8d5dd4711453ae4d55fed2 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 1 Aug 2022 11:44:35 +0800 Subject: [PATCH 4/5] update --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 11e2b4526..dc288bf00 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 - github.com/pingcap/parser v3.1.2+incompatible // indirect + github.com/pingcap/parser v3.1.2+incompatible github.com/pingcap/tidb v1.1.0-beta.0.20220114083142-ed1cd2a2f4d3 github.com/pingcap/tidb-tools v5.2.3-0.20211101071251-40e8f0cfcb1d+incompatible github.com/pingcap/tidb/parser v0.0.0-20220114083142-ed1cd2a2f4d3 @@ -40,7 +40,7 @@ require ( github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.2.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/pflag v1.0.5 github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee From 0ebf5298de0a8f39b08d01c58fbbbeb5246dd8b9 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Mon, 1 Aug 2022 11:46:11 +0800 Subject: [PATCH 5/5] update --- go.mod | 2 -- go.sum | 2 -- 2 files changed, 4 deletions(-) diff --git a/go.mod b/go.mod index dc288bf00..6f7b6f248 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 - github.com/pingcap/parser v3.1.2+incompatible github.com/pingcap/tidb v1.1.0-beta.0.20220114083142-ed1cd2a2f4d3 github.com/pingcap/tidb-tools v5.2.3-0.20211101071251-40e8f0cfcb1d+incompatible github.com/pingcap/tidb/parser v0.0.0-20220114083142-ed1cd2a2f4d3 @@ -40,7 +39,6 @@ require ( github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.2.1 // indirect - github.com/spf13/pflag v1.0.5 github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7 github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee diff --git a/go.sum b/go.sum index 39204daa3..9fe68ef06 100644 --- a/go.sum +++ b/go.sum @@ -703,8 +703,6 @@ github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIf github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 h1:SvWCbCPh1YeHd9yQLksvJYAgft6wLTY1aNG81tpyscQ= github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= -github.com/pingcap/parser v3.1.2+incompatible h1:ZAtv2VBZitECpaHshSIp1bkBhEqJYerw7nO/HYsn8MM= -github.com/pingcap/parser v3.1.2+incompatible/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA= github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8= github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY= github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d h1:k3/APKZjXOyJrFy8VyYwRlZhMelpD3qBLJNsw3bPl/g=