diff --git a/conf/radon.default.json b/conf/radon.default.json index 4b2302dc..d24e2778 100755 --- a/conf/radon.default.json +++ b/conf/radon.default.json @@ -14,6 +14,6 @@ "level": "INFO" }, "monitor": { - "monitor-address": "0.0.0.0:13308" + "monitor-address": "0.0.0.0:13380" } } diff --git a/makefile b/makefile index 6f17ad38..be6cbb6c 100644 --- a/makefile +++ b/makefile @@ -84,7 +84,8 @@ allpkgs = xbase/...\ proxy\ audit\ syncer\ - binlog + binlog\ + monitor coverage: go build -v -o bin/gotestcover \ src/vendor/github.com/pierrre/gotestcover/*.go; diff --git a/src/config/config.go b/src/config/config.go index b51a45d1..129a1198 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -30,6 +30,7 @@ type ProxyConfig struct { QueryTimeout int `json:"query-timeout"` PeerAddress string `json:"peer-address,omitempty"` BackupDefaultEngine string `json:"backup-default-engine"` + LongQueryTime int `json:"long-query-time"` } // DefaultProxyConfig returns default proxy config. @@ -43,6 +44,7 @@ func DefaultProxyConfig() *ProxyConfig { QueryTimeout: 5 * 60 * 1000, // 5minutes PeerAddress: "127.0.0.1:8080", BackupDefaultEngine: "TokuDB", // Default MySQL storage engine for backup. + LongQueryTime: 5, // 5 seconds } } @@ -153,7 +155,7 @@ type MonitorConfig struct { // DefaultMonitorConfig returns default monitor config. func DefaultMonitorConfig() *MonitorConfig { return &MonitorConfig{ - MonitorAddress: "0.0.0.0:13308", + MonitorAddress: "0.0.0.0:13380", } } diff --git a/src/monitor/monitor.go b/src/monitor/monitor.go index bef060f3..003e0de3 100644 --- a/src/monitor/monitor.go +++ b/src/monitor/monitor.go @@ -61,6 +61,14 @@ var ( }, []string{"description"}, ) + + slowQueryTotalCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "slow_query_total", + Help: "Counter of slow queries.", + }, + []string{"command", "result"}, + ) ) func init() { @@ -69,13 +77,14 @@ func init() { prometheus.MustRegister(queryTotalCounter) prometheus.MustRegister(backendNum) prometheus.MustRegister(diskUsage) + prometheus.MustRegister(slowQueryTotalCounter) } // Start monitor func Start(log *xlog.Log, conf *config.Config) { webMonitorIP, webMonitorPort, err := net.SplitHostPort(conf.Monitor.MonitorAddress) if err != nil { - log.Error("monitor.start.splithostport[%v].error:[%v]", conf.Monitor.MonitorAddress, err) + panic(err) } log.Info("[prometheus metrics]:\thttp://{%s}:%s%s\n", @@ -126,3 +135,8 @@ func BackendDec(btype string) { func DiskUsageSet(v float64) { diskUsage.WithLabelValues("percent").Set(v) } + +// SlowQueryTotalCounterInc add 1 +func SlowQueryTotalCounterInc(command string, result string) { + slowQueryTotalCounter.WithLabelValues(command, result).Inc() +} diff --git a/src/monitor/monitor_test.go b/src/monitor/monitor_test.go index 962bae00..28aececa 100644 --- a/src/monitor/monitor_test.go +++ b/src/monitor/monitor_test.go @@ -11,8 +11,11 @@ package monitor import ( "testing" + "config" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" + "github.com/xelabs/go-mysqlstack/xlog" ) func TestClientConnectionIncDec(t *testing.T) { @@ -120,3 +123,40 @@ func TestDiskUsageSet(t *testing.T) { assert.EqualValues(t, v, r) } + +func TestSlowQueryTotalCounterInc(t *testing.T) { + // sql supported + command := "Select" + result := "OK" + SlowQueryTotalCounterInc(command, result) + SlowQueryTotalCounterInc(command, result) + + var m dto.Metric + g, _ := queryTotalCounter.GetMetricWithLabelValues(command, result) + g.Write(&m) + v := m.GetCounter().GetValue() + assert.EqualValues(t, 2, v) + + // sql not supported + command = "Unsupport" + result = "Error" + SlowQueryTotalCounterInc(command, result) + + g, _ = queryTotalCounter.GetMetricWithLabelValues(command, result) + g.Write(&m) + v = m.GetCounter().GetValue() + + assert.EqualValues(t, 1, v) +} + +func TestMonitorStart(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.ERROR)) + var conf config.Config + conf.Proxy = config.DefaultProxyConfig() + conf.Binlog = config.DefaultBinlogConfig() + conf.Audit = config.DefaultAuditConfig() + conf.Router = config.DefaultRouterConfig() + conf.Log = config.DefaultLogConfig() + conf.Monitor = config.DefaultMonitorConfig() + Start(log, &conf) +} diff --git a/src/proxy/insert_test.go b/src/proxy/insert_test.go index dd2a30fa..9ebf60b2 100644 --- a/src/proxy/insert_test.go +++ b/src/proxy/insert_test.go @@ -93,3 +93,43 @@ func TestProxyInsertQuerys(t *testing.T) { assert.Nil(t, err) } } + +func TestProxyLongTimeQuerys(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + fakedbs, proxy, cleanup := MockProxy(log) + defer cleanup() + address := proxy.Address() + proxy.SetLongQueryTime(0) + + // fakedbs. + { + fakedbs.AddQueryPattern("create table .*", &sqltypes.Result{}) + fakedbs.AddQueryPattern("insert .*", &sqltypes.Result{}) + } + + tables := []string{ + "create table test.t1(id int, b int) partition by hash(id)", + "create table test.t2(id datetime, b int) partition by hash(id)", + "create table test.t3(id varchar(200), b int) partition by hash(id)", + "create table test.t4(id decimal, b int) partition by hash(id)", + "create table test.t5(id float, b int) partition by hash(id)", + } + + for _, table := range tables { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + _, err = client.FetchAll(table, -1) + assert.Nil(t, err) + } + + querysError := []string{ + "insert into test.t6(id, b) values(1, 1)", + "insert into test.t7(id, b) values(1, 1)", + } + for _, query := range querysError { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + _, err = client.FetchAll(query, -1) + assert.NotNil(t, err) + } +} diff --git a/src/proxy/proxy.go b/src/proxy/proxy.go index a87f5c48..6365bbad 100644 --- a/src/proxy/proxy.go +++ b/src/proxy/proxy.go @@ -202,6 +202,14 @@ func (p *Proxy) SetQueryTimeout(timeout int) { p.conf.Proxy.QueryTimeout = timeout } +// Set long Query Time used to set long query time. +func (p *Proxy) SetLongQueryTime(longQueryTime int) { + p.mu.Lock() + defer p.mu.Unlock() + p.log.Info("proxy.SetQueryTimeout:[%d->%d]", p.conf.Proxy.LongQueryTime, longQueryTime) + p.conf.Proxy.LongQueryTime = longQueryTime +} + // SetTwoPC used to set twopc to enable or disable. func (p *Proxy) SetTwoPC(enable bool) { p.mu.Lock() diff --git a/src/proxy/query.go b/src/proxy/query.go index 0b55632c..fd014df9 100644 --- a/src/proxy/query.go +++ b/src/proxy/query.go @@ -10,6 +10,7 @@ package proxy import ( "strings" + "time" "monitor" "xbase" @@ -40,6 +41,8 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, callback throttle := spanner.throttle diskChecker := spanner.diskChecker hasBackup := spanner.scatter.HasBackup() + timeStart := time.Now() + slowQueryTime := time.Duration(spanner.conf.Proxy.LongQueryTime) * time.Second // Throttle. throttle.Acquire() @@ -78,7 +81,7 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, callback } defer func() { - queryStat(node, err) + queryStat(node, timeStart, slowQueryTime, err) }() switch node.(type) { case *sqlparser.Use: @@ -314,7 +317,7 @@ func (spanner *Spanner) IsDDL(node sqlparser.Statement) bool { return false } -func queryStat(node sqlparser.Statement, err error) { +func queryStat(node sqlparser.Statement, timeStart time.Time, slowQueryTime time.Duration, err error) { var command string switch node.(type) { case *sqlparser.Use: @@ -342,9 +345,16 @@ func queryStat(node sqlparser.Statement, err error) { default: command = "Unsupport" } + queryTime := time.Since(timeStart) if err != nil { + if queryTime > slowQueryTime { + monitor.SlowQueryTotalCounterInc(command, "Error") + } monitor.QueryTotalCounterInc(command, "Error") } else { + if queryTime > slowQueryTime { + monitor.SlowQueryTotalCounterInc(command, "OK") + } monitor.QueryTotalCounterInc(command, "OK") } } diff --git a/src/proxy/query_test.go b/src/proxy/query_test.go index 8e26ef6b..bfbc9363 100644 --- a/src/proxy/query_test.go +++ b/src/proxy/query_test.go @@ -370,6 +370,85 @@ func TestProxyQueryStreamWithBackup(t *testing.T) { } } +// Test with long query time +func TestLongQuery(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + fakedbs, proxy, cleanup := MockProxy(log) + defer cleanup() + address := proxy.Address() + + querys := []string{ + "select 1 from dual", + } + querysError := []string{ + "select a a from dual", + } + + // fakedbs: add a query and returns the expected result without no delay + { + fakedbs.AddQueryPattern("select 1 from dual", &sqltypes.Result{}) + } + + // set longQueryTime = 0s + { + proxy.SetLongQueryTime(0) + // long query success + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + defer client.Close() + + for _, query := range querys { + _, err = client.FetchAll(query, -1) + assert.Nil(t, err) + } + } + // long query failed + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + defer client.Close() + + for _, query := range querysError { + _, err = client.FetchAll(query, -1) + assert.NotNil(t, err) + } + } + } + + // fakedbs: add a query and returns the expected result returned by delay 6s + { + fakedbs.AddQueryDelay("select 1 from dual", &sqltypes.Result{}, 6*1000) + } + + // set longQueryTime = 5s + { + proxy.SetLongQueryTime(5) + // long query success + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + defer client.Close() + + for _, query := range querys { + _, err = client.FetchAll(query, -1) + assert.Nil(t, err) + } + } + // long query failed + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + defer client.Close() + + for _, query := range querysError { + _, err = client.FetchAll(query, -1) + assert.NotNil(t, err) + } + } + } +} + /* func TestProxyQueryBench(t *testing.T) { log := xlog.NewStdLog(xlog.Level(xlog.ERROR))