Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reparo: support pprof and metrics #1184

Merged
merged 5 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
github.com/pingcap/kvproto v0.0.0-20211207042851-78a55fb8e69c
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/parser v3.1.2+incompatible // indirect
github.com/pingcap/tidb v1.1.0-beta.0.20220114083142-ed1cd2a2f4d3
github.com/pingcap/tidb-tools v5.2.3-0.20211101071251-40e8f0cfcb1d+incompatible
github.com/pingcap/tidb/parser v0.0.0-20220114083142-ed1cd2a2f4d3
Expand All @@ -39,6 +40,7 @@ require (
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.2.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211229051614-62d6b4a2e8f7
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,8 @@ github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIf
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 h1:SvWCbCPh1YeHd9yQLksvJYAgft6wLTY1aNG81tpyscQ=
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pingcap/parser v3.1.2+incompatible h1:ZAtv2VBZitECpaHshSIp1bkBhEqJYerw7nO/HYsn8MM=
github.com/pingcap/parser v3.1.2+incompatible/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041UWP+NqYzrJ3fMgC/Hw9wnmQ/tUkp/JaHly8=
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY=
github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d h1:k3/APKZjXOyJrFy8VyYwRlZhMelpD3qBLJNsw3bPl/g=
Expand Down
2 changes: 2 additions & 0 deletions reparo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Config struct {
StopTSO int64 `toml:"stop-tso" json:"stop-tso"`
TxnBatch int `toml:"txn-batch" json:"txn-batch"`
WorkerCount int `toml:"worker-count" json:"worker-count"`
StatusAddr string `toml:"status-addr" json:"status-addr"`

DestType string `toml:"dest-type" json:"dest-type"`
DestDB *syncer.DBConfig `toml:"dest-db" json:"dest-db"`
Expand Down Expand Up @@ -77,6 +78,7 @@ func NewConfig() *Config {
fs.StringVar(&c.Dir, "data-dir", "", "drainer data directory path")
fs.StringVar(&c.StartDatetime, "start-datetime", "", "recovery from start-datetime, empty string means starting from the beginning of the first file")
fs.StringVar(&c.StopDatetime, "stop-datetime", "", "recovery end in stop-datetime, empty string means never end.")
fs.StringVar(&c.StatusAddr, "status-addr", ":8381", "reparo API server and pprof addr")
fs.Int64Var(&c.StartTSO, "start-tso", 0, "similar to start-datetime but in pd-server tso format")
fs.Int64Var(&c.StopTSO, "stop-tso", 0, "similar to stop-datetime, but in pd-server tso format")
fs.IntVar(&c.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
Expand Down
1 change: 1 addition & 0 deletions reparo/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func Decode(r io.Reader) (*pb.Binlog, int64, error) {
if err != nil {
return nil, 0, errors.Trace(err)
}
readBinlogSizeHistogram.Observe(float64(length))

binlog := &pb.Binlog{}
err = binlog.Unmarshal(payload)
Expand Down
69 changes: 69 additions & 0 deletions reparo/http_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package reparo

import (
"net"
"net/http"
"net/http/pprof"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)

var cmuxReadTimeout = 10 * time.Second

func startHTTPServer(lis net.Listener) {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())

router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
router.HandleFunc("/debug/pprof/profile", pprof.Profile)
router.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
router.HandleFunc("/debug/pprof/trace", pprof.Trace)

httpServer := &http.Server{
Handler: router,
}
err := httpServer.Serve(lis)
err = errors.Cause(err)
if err != nil && !isErrNetClosing(err) && err != http.ErrServerClosed {
log.Info("reparo http handler return with error", zap.String("error", err.Error()))
}
}

func startReparoService(addr string) error {
rootLis, err := net.Listen("tcp", addr)
if err != nil {
return errors.Annotate(err, "start listening")
}

// create a cmux
m := cmux.New(rootLis)
m.SetReadTimeout(cmuxReadTimeout) // set a timeout, ref: https://github.com/pingcap/tidb-binlog/pull/352

httpL := m.Match(cmux.HTTP1Fast())
go startHTTPServer(httpL)

err = m.Serve() // start serving, block
if err != nil && isErrNetClosing(err) {
err = nil
}
return err
}

var useOfClosedErrMsg = "use of closed network connection"

// isErrNetClosing checks whether is an ErrNetClosing error
func isErrNetClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), useOfClosedErrMsg)
}
71 changes: 71 additions & 0 deletions reparo/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package reparo

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
eventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "event",
Help: "the count of sql event(dml, ddl).",
}, []string{"type"})

checkpointTSOGauge = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "checkpoint_tso",
Help: "save checkpoint tso of drainer.",
})
executeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "execute_duration_time",
Help: "Bucketed histogram of processing time (s) of a execute to sync data to downstream.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
})

queryHistogramVec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "query_duration_time",
Help: "Bucketed histogram of processing time (s) of a query to sync data to downstream.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 18),
}, []string{"type"})

readBinlogSizeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "read_binlog_size",
Help: "Bucketed histogram of size of a binlog.",
Buckets: prometheus.ExponentialBuckets(16, 2, 25),
})

queueSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "binlog",
Subsystem: "drainer",
Name: "queue_size",
Help: "the size of queue",
}, []string{"name"})
)

func init() {
registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
registry.MustRegister(checkpointTSOGauge)
registry.MustRegister(eventCounter)
registry.MustRegister(executeHistogram)
registry.MustRegister(readBinlogSizeHistogram)
registry.MustRegister(queryHistogramVec)
registry.MustRegister(queueSizeGauge)

prometheus.DefaultGatherer = registry
}
20 changes: 18 additions & 2 deletions reparo/reparo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ package reparo

import (
"io"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/filter"
"github.com/pingcap/tidb-binlog/pkg/loader"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
"github.com/pingcap/tidb-binlog/reparo/syncer"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -37,7 +39,11 @@ type Reparo struct {
func New(cfg *Config) (*Reparo, error) {
log.Info("New Reparo", zap.Stringer("config", cfg))

syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode)
syncer, err := syncer.New(cfg.DestType, cfg.DestDB, cfg.WorkerCount, cfg.TxnBatch, cfg.SafeMode, &loader.MetricsGroup{
EventCounterVec: eventCounter,
QueryHistogramVec: queryHistogramVec,
QueueSizeGauge: queueSizeGauge,
})
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -53,6 +59,14 @@ func New(cfg *Config) (*Reparo, error) {

// Process runs the main procedure.
func (r *Reparo) Process() error {
if r.cfg.StatusAddr != "" {
go func() {
err := startReparoService(r.cfg.StatusAddr)
if err != nil {
log.Info("meet error when stopping reparo http service", zap.String("error", err.Error()))
}
}()
}
pbReader, err := newDirPbReader(r.cfg.Dir, r.cfg.StartTSO, r.cfg.StopTSO)
if err != nil {
return errors.Annotatef(err, "new reader failed dir: %s", r.cfg.Dir)
Expand All @@ -77,10 +91,12 @@ func (r *Reparo) Process() error {
if ignore {
continue
}

beginTime := time.Now()
err = r.syncer.Sync(binlog, func(binlog *pb.Binlog) {
dt := oracle.GetTimeFromTS(uint64(binlog.CommitTs))
log.Info("sync binlog success", zap.Int64("ts", binlog.CommitTs), zap.Time("datetime", dt))
checkpointTSOGauge.Set(float64(oracle.ExtractPhysical(uint64(binlog.CommitTs))))
executeHistogram.Observe(time.Since(beginTime).Seconds())
})

if err != nil {
Expand Down
12 changes: 8 additions & 4 deletions reparo/syncer/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,21 @@ var (
// should be only used for unit test to create mock db
var createDB = loader.CreateDB

func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
func newMysqlSyncer(cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) {
db, err := createDB(cfg.User, cfg.Password, cfg.Host, cfg.Port, nil)
if err != nil {
return nil, errors.Trace(err)
}

return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode)
return newMysqlSyncerFromSQLDB(db, worker, batchSize, safemode, metricsGroup)
}

func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool) (*mysqlSyncer, error) {
loader, err := loader.NewLoader(db, loader.WorkerCount(worker), loader.BatchSize(batchSize))
func newMysqlSyncerFromSQLDB(db *sql.DB, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (*mysqlSyncer, error) {
opts := []loader.Option{loader.WorkerCount(worker), loader.BatchSize(batchSize)}
if metricsGroup != nil {
opts = append(opts, loader.Metrics(metricsGroup))
}
loader, err := loader.NewLoader(db, opts...)
if err != nil {
return nil, errors.Annotate(err, "new loader failed")
}
Expand Down
4 changes: 2 additions & 2 deletions reparo/syncer/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"database/sql"
"time"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/DATA-DOG/go-sqlmock"
"github.com/pingcap/check"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)
Expand Down Expand Up @@ -33,7 +33,7 @@ func (s *testMysqlSuite) testMysqlSyncer(c *check.C, safemode bool) {
createDB = oldCreateDB
}()

syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode)
syncer, err := newMysqlSyncer(&DBConfig{}, 1, 20, safemode, nil)
c.Assert(err, check.IsNil)

mock.ExpectBegin()
Expand Down
5 changes: 3 additions & 2 deletions reparo/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package syncer
import (
"fmt"

"github.com/pingcap/tidb-binlog/pkg/loader"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
)

Expand All @@ -29,10 +30,10 @@ type Syncer interface {
}

// New creates a new executor based on the name.
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool) (Syncer, error) {
func New(name string, cfg *DBConfig, worker int, batchSize int, safemode bool, metricsGroup *loader.MetricsGroup) (Syncer, error) {
switch name {
case "mysql":
return newMysqlSyncer(cfg, worker, batchSize, safemode)
return newMysqlSyncer(cfg, worker, batchSize, safemode, metricsGroup)
case "print":
return newPrintSyncer()
case "memory":
Expand Down
2 changes: 1 addition & 1 deletion reparo/syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *testSyncerSuite) TestNewSyncer(c *check.C) {
}

for _, testCase := range testCases {
syncer, err := New(testCase.typeStr, cfg, 16, 20, false)
syncer, err := New(testCase.typeStr, cfg, 16, 20, false, nil)
c.Assert(err, check.IsNil)
c.Assert(reflect.TypeOf(syncer), testCase.checker, testCase.tp)
}
Expand Down