Skip to content

Commit

Permalink
src: add long query statistic
Browse files Browse the repository at this point in the history
  • Loading branch information
hustjieke authored and BohuTANG committed Sep 17, 2018
1 parent cb88139 commit 25cb9be
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 6 deletions.
2 changes: 1 addition & 1 deletion conf/radon.default.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
"level": "INFO"
},
"monitor": {
"monitor-address": "0.0.0.0:13308"
"monitor-address": "0.0.0.0:13380"
}
}
3 changes: 2 additions & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ allpkgs = xbase/...\
proxy\
audit\
syncer\
binlog
binlog\
monitor
coverage:
go build -v -o bin/gotestcover \
src/vendor/github.com/pierrre/gotestcover/*.go;
Expand Down
4 changes: 3 additions & 1 deletion src/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ProxyConfig struct {
QueryTimeout int `json:"query-timeout"`
PeerAddress string `json:"peer-address,omitempty"`
BackupDefaultEngine string `json:"backup-default-engine"`
LongQueryTime int `json:"long-query-time"`
}

// DefaultProxyConfig returns default proxy config.
Expand All @@ -43,6 +44,7 @@ func DefaultProxyConfig() *ProxyConfig {
QueryTimeout: 5 * 60 * 1000, // 5minutes
PeerAddress: "127.0.0.1:8080",
BackupDefaultEngine: "TokuDB", // Default MySQL storage engine for backup.
LongQueryTime: 5, // 5 seconds
}
}

Expand Down Expand Up @@ -153,7 +155,7 @@ type MonitorConfig struct {
// DefaultMonitorConfig returns default monitor config.
func DefaultMonitorConfig() *MonitorConfig {
return &MonitorConfig{
MonitorAddress: "0.0.0.0:13308",
MonitorAddress: "0.0.0.0:13380",
}
}

Expand Down
16 changes: 15 additions & 1 deletion src/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ var (
},
[]string{"description"},
)

slowQueryTotalCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "slow_query_total",
Help: "Counter of slow queries.",
},
[]string{"command", "result"},
)
)

func init() {
Expand All @@ -69,13 +77,14 @@ func init() {
prometheus.MustRegister(queryTotalCounter)
prometheus.MustRegister(backendNum)
prometheus.MustRegister(diskUsage)
prometheus.MustRegister(slowQueryTotalCounter)
}

// Start monitor
func Start(log *xlog.Log, conf *config.Config) {
webMonitorIP, webMonitorPort, err := net.SplitHostPort(conf.Monitor.MonitorAddress)
if err != nil {
log.Error("monitor.start.splithostport[%v].error:[%v]", conf.Monitor.MonitorAddress, err)
panic(err)
}

log.Info("[prometheus metrics]:\thttp://{%s}:%s%s\n",
Expand Down Expand Up @@ -126,3 +135,8 @@ func BackendDec(btype string) {
func DiskUsageSet(v float64) {
diskUsage.WithLabelValues("percent").Set(v)
}

// SlowQueryTotalCounterInc add 1
func SlowQueryTotalCounterInc(command string, result string) {
slowQueryTotalCounter.WithLabelValues(command, result).Inc()
}
40 changes: 40 additions & 0 deletions src/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ package monitor
import (
"testing"

"config"

dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/xelabs/go-mysqlstack/xlog"
)

func TestClientConnectionIncDec(t *testing.T) {
Expand Down Expand Up @@ -120,3 +123,40 @@ func TestDiskUsageSet(t *testing.T) {

assert.EqualValues(t, v, r)
}

func TestSlowQueryTotalCounterInc(t *testing.T) {
// sql supported
command := "Select"
result := "OK"
SlowQueryTotalCounterInc(command, result)
SlowQueryTotalCounterInc(command, result)

var m dto.Metric
g, _ := queryTotalCounter.GetMetricWithLabelValues(command, result)
g.Write(&m)
v := m.GetCounter().GetValue()
assert.EqualValues(t, 2, v)

// sql not supported
command = "Unsupport"
result = "Error"
SlowQueryTotalCounterInc(command, result)

g, _ = queryTotalCounter.GetMetricWithLabelValues(command, result)
g.Write(&m)
v = m.GetCounter().GetValue()

assert.EqualValues(t, 1, v)
}

func TestMonitorStart(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.ERROR))
var conf config.Config
conf.Proxy = config.DefaultProxyConfig()
conf.Binlog = config.DefaultBinlogConfig()
conf.Audit = config.DefaultAuditConfig()
conf.Router = config.DefaultRouterConfig()
conf.Log = config.DefaultLogConfig()
conf.Monitor = config.DefaultMonitorConfig()
Start(log, &conf)
}
40 changes: 40 additions & 0 deletions src/proxy/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,43 @@ func TestProxyInsertQuerys(t *testing.T) {
assert.Nil(t, err)
}
}

func TestProxyLongTimeQuerys(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedbs, proxy, cleanup := MockProxy(log)
defer cleanup()
address := proxy.Address()
proxy.SetLongQueryTime(0)

// fakedbs.
{
fakedbs.AddQueryPattern("create table .*", &sqltypes.Result{})
fakedbs.AddQueryPattern("insert .*", &sqltypes.Result{})
}

tables := []string{
"create table test.t1(id int, b int) partition by hash(id)",
"create table test.t2(id datetime, b int) partition by hash(id)",
"create table test.t3(id varchar(200), b int) partition by hash(id)",
"create table test.t4(id decimal, b int) partition by hash(id)",
"create table test.t5(id float, b int) partition by hash(id)",
}

for _, table := range tables {
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
_, err = client.FetchAll(table, -1)
assert.Nil(t, err)
}

querysError := []string{
"insert into test.t6(id, b) values(1, 1)",
"insert into test.t7(id, b) values(1, 1)",
}
for _, query := range querysError {
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
_, err = client.FetchAll(query, -1)
assert.NotNil(t, err)
}
}
8 changes: 8 additions & 0 deletions src/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ func (p *Proxy) SetQueryTimeout(timeout int) {
p.conf.Proxy.QueryTimeout = timeout
}

// Set long Query Time used to set long query time.
func (p *Proxy) SetLongQueryTime(longQueryTime int) {
p.mu.Lock()
defer p.mu.Unlock()
p.log.Info("proxy.SetQueryTimeout:[%d->%d]", p.conf.Proxy.LongQueryTime, longQueryTime)
p.conf.Proxy.LongQueryTime = longQueryTime
}

// SetTwoPC used to set twopc to enable or disable.
func (p *Proxy) SetTwoPC(enable bool) {
p.mu.Lock()
Expand Down
14 changes: 12 additions & 2 deletions src/proxy/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package proxy

import (
"strings"
"time"

"monitor"
"xbase"
Expand Down Expand Up @@ -40,6 +41,8 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, callback
throttle := spanner.throttle
diskChecker := spanner.diskChecker
hasBackup := spanner.scatter.HasBackup()
timeStart := time.Now()
slowQueryTime := time.Duration(spanner.conf.Proxy.LongQueryTime) * time.Second

// Throttle.
throttle.Acquire()
Expand Down Expand Up @@ -78,7 +81,7 @@ func (spanner *Spanner) ComQuery(session *driver.Session, query string, callback
}

defer func() {
queryStat(node, err)
queryStat(node, timeStart, slowQueryTime, err)
}()
switch node.(type) {
case *sqlparser.Use:
Expand Down Expand Up @@ -314,7 +317,7 @@ func (spanner *Spanner) IsDDL(node sqlparser.Statement) bool {
return false
}

func queryStat(node sqlparser.Statement, err error) {
func queryStat(node sqlparser.Statement, timeStart time.Time, slowQueryTime time.Duration, err error) {
var command string
switch node.(type) {
case *sqlparser.Use:
Expand Down Expand Up @@ -342,9 +345,16 @@ func queryStat(node sqlparser.Statement, err error) {
default:
command = "Unsupport"
}
queryTime := time.Since(timeStart)
if err != nil {
if queryTime > slowQueryTime {
monitor.SlowQueryTotalCounterInc(command, "Error")
}
monitor.QueryTotalCounterInc(command, "Error")
} else {
if queryTime > slowQueryTime {
monitor.SlowQueryTotalCounterInc(command, "OK")
}
monitor.QueryTotalCounterInc(command, "OK")
}
}
79 changes: 79 additions & 0 deletions src/proxy/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,85 @@ func TestProxyQueryStreamWithBackup(t *testing.T) {
}
}

// Test with long query time
func TestLongQuery(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedbs, proxy, cleanup := MockProxy(log)
defer cleanup()
address := proxy.Address()

querys := []string{
"select 1 from dual",
}
querysError := []string{
"select a a from dual",
}

// fakedbs: add a query and returns the expected result without no delay
{
fakedbs.AddQueryPattern("select 1 from dual", &sqltypes.Result{})
}

// set longQueryTime = 0s
{
proxy.SetLongQueryTime(0)
// long query success
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
defer client.Close()

for _, query := range querys {
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}
}
// long query failed
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
defer client.Close()

for _, query := range querysError {
_, err = client.FetchAll(query, -1)
assert.NotNil(t, err)
}
}
}

// fakedbs: add a query and returns the expected result returned by delay 6s
{
fakedbs.AddQueryDelay("select 1 from dual", &sqltypes.Result{}, 6*1000)
}

// set longQueryTime = 5s
{
proxy.SetLongQueryTime(5)
// long query success
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
defer client.Close()

for _, query := range querys {
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}
}
// long query failed
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
defer client.Close()

for _, query := range querysError {
_, err = client.FetchAll(query, -1)
assert.NotNil(t, err)
}
}
}
}

/*
func TestProxyQueryBench(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.ERROR))
Expand Down

0 comments on commit 25cb9be

Please sign in to comment.