Skip to content

Commit

Permalink
Fix missing metrics and verify metrics are not lost
Browse files Browse the repository at this point in the history
Signed-off-by: Davanum Srinivas <davanum@gmail.com>
  • Loading branch information
dims committed Jan 17, 2025
1 parent 3a2afb3 commit f1e0118
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 13 deletions.
2 changes: 2 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ type ServerConfig struct {

// ServerFeatureGate is a server level feature gate
ServerFeatureGate featuregate.FeatureGate

Metrics string
}

// VerifyBootstrap sanity-checks the initial config for bootstrap case
Expand Down
14 changes: 1 addition & 13 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -227,6 +226,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
V2Deprecation: cfg.V2DeprecationEffective(),
ExperimentalLocalAddress: cfg.InferLocalAddr(),
ServerFeatureGate: cfg.ServerFeatureGate,
Metrics: cfg.Metrics,
}

if srvcfg.ExperimentalEnableDistributedTracing {
Expand Down Expand Up @@ -844,18 +844,6 @@ func (e *Etcd) createMetricsListener(murl url.URL) (net.Listener, error) {
}

func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
var opts prometheus.HistogramOpts
serverHandledHistogram := prometheus.NewHistogramVec(
opts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
err := prometheus.Register(serverHandledHistogram)
if err != nil {
e.GetLogger().Error("setting up prometheus metrics failed.", zap.Error(err))
}
}

if len(e.cfg.ListenMetricsUrls) > 0 {
metricsMux := http.NewServeMux()
etcdhttp.HandleMetrics(metricsMux)
Expand Down
38 changes: 38 additions & 0 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
package v3rpc

import (
"context"
"crypto/tls"
"math"
"strings"
"time"

grpc_prometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -34,6 +37,14 @@ const (
maxSendBytes = math.MaxInt32
)

func splitMethodName(fullMethodName string) (string, string) {
fullMethodName = strings.TrimPrefix(fullMethodName, "/") // remove leading slash
if i := strings.Index(fullMethodName, "/"); i >= 0 {
return fullMethodName[:i], fullMethodName[i+1:]
}
return "unknown", "unknown"

Check warning on line 45 in server/etcdserver/api/v3rpc/grpc.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/api/v3rpc/grpc.go#L45

Added line #L45 was not covered by tests
}

func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
Expand All @@ -57,6 +68,33 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer
newStreamInterceptor(s),
serverMetrics.StreamServerInterceptor(),
}
if s.Cfg.Metrics == "extensive" {
serverHandledHistogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "grpc_server_handling_seconds",
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prometheus.DefBuckets,
},
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
prometheus.Register(serverHandledHistogram)

chainUnaryInterceptors = append(chainUnaryInterceptors,
func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
startTime := time.Now()
resp, err = handler(ctx, req)
grpcService, grpcMethod := splitMethodName(info.FullMethod)
serverHandledHistogram.WithLabelValues("unary", grpcService, grpcMethod).Observe(time.Since(startTime).Seconds())
return resp, err
})
chainStreamInterceptors = append(chainStreamInterceptors, func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
startTime := time.Now()
err := handler(srv, ss)
grpcService, grpcMethod := splitMethodName(info.FullMethod)
serverHandledHistogram.WithLabelValues("stream", grpcService, grpcMethod).Observe(time.Since(startTime).Seconds())
return err
})
}

if s.Cfg.ExperimentalEnableDistributedTracing {
chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
Expand Down
4 changes: 4 additions & 0 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ type ClusterConfig struct {
ExperimentalMaxLearners int
DisableStrictReconfigCheck bool
CorruptCheckTime time.Duration
Metrics string
}

type Cluster struct {
Expand Down Expand Up @@ -292,6 +293,7 @@ func (c *Cluster) MustNewMember(t testutil.TB) *Member {
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
DisableStrictReconfigCheck: c.Cfg.DisableStrictReconfigCheck,
CorruptCheckTime: c.Cfg.CorruptCheckTime,
Metrics: c.Cfg.Metrics,
})
m.DiscoveryURL = c.Cfg.DiscoveryURL
return m
Expand Down Expand Up @@ -617,6 +619,7 @@ type MemberConfig struct {
ExperimentalMaxLearners int
DisableStrictReconfigCheck bool
CorruptCheckTime time.Duration
Metrics string
}

// MustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -731,6 +734,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.ExperimentalMaxLearners != 0 {
m.ExperimentalMaxLearners = mcfg.ExperimentalMaxLearners
}
m.Metrics = mcfg.Metrics
m.V2Deprecation = config.V2_DEPR_DEFAULT
m.GRPCServerRecorder = &grpctesting.GRPCRecorder{}

Expand Down
186 changes: 186 additions & 0 deletions tests/integration/clientv3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,189 @@ func getHTTPBodyAsLines(t *testing.T, url string) []string {
resp.Body.Close()
return lines
}

func TestAllMetricsGenerated(t *testing.T) {
integration2.BeforeTest(t)

var (
addr = "localhost:27989"
ln net.Listener
)

srv := &http.Server{Handler: promhttp.Handler()}
srv.SetKeepAlivesEnabled(false)

ln, err := transport.NewUnixListener(addr)
if err != nil {
t.Errorf("Error: %v occurred while listening on addr: %v", err, addr)
}

donec := make(chan struct{})
defer func() {
ln.Close()
<-donec
}()

// listen for all Prometheus metrics
go func() {
defer close(donec)

serr := srv.Serve(ln)
if serr != nil && !transport.IsClosedConnError(serr) {
t.Errorf("Err serving http requests: %v", serr)
}
}()

url := "unix://" + addr + "/metrics"

clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1, Metrics: "extensive"})
defer clus.Terminate(t)

clientMetrics := grpcprom.NewClientMetrics()
prometheus.Register(clientMetrics)

cfg := clientv3.Config{
Endpoints: []string{clus.Members[0].GRPCURL},
DialOptions: []grpc.DialOption{
grpc.WithUnaryInterceptor(clientMetrics.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(clientMetrics.StreamClientInterceptor()),
},
}
cli, cerr := integration2.NewClient(t, cfg)
if cerr != nil {
t.Fatal(cerr)
}
defer cli.Close()

// Perform some operations to generate metrics
wc := cli.Watch(context.Background(), "foo")
_, err = cli.Put(context.Background(), "foo", "bar")
if err != nil {
t.Errorf("Error putting value in key store")
}

// consume watch response
select {
case <-wc:
case <-time.After(10 * time.Second):
t.Error("Timeout occurred for getting watch response")
}

// Define the expected list of metrics
expectedMetrics := []string{
"etcd_cluster_version",
"etcd_disk_backend_commit_duration_seconds_bucket",
"etcd_disk_backend_commit_duration_seconds_count",
"etcd_disk_backend_commit_duration_seconds_sum",
"etcd_disk_backend_defrag_duration_seconds_bucket",
"etcd_disk_backend_defrag_duration_seconds_count",
"etcd_disk_backend_defrag_duration_seconds_sum",
"etcd_disk_backend_snapshot_duration_seconds_bucket",
"etcd_disk_backend_snapshot_duration_seconds_count",
"etcd_disk_backend_snapshot_duration_seconds_sum",
"etcd_disk_defrag_inflight",
"etcd_disk_wal_fsync_duration_seconds_bucket",
"etcd_disk_wal_fsync_duration_seconds_count",
"etcd_disk_wal_fsync_duration_seconds_sum",
"etcd_disk_wal_write_bytes_total",
"etcd_disk_wal_write_duration_seconds_bucket",
"etcd_disk_wal_write_duration_seconds_count",
"etcd_disk_wal_write_duration_seconds_sum",
"etcd_mvcc_db_open_read_transactions",
"etcd_mvcc_db_total_size_in_bytes",
"etcd_mvcc_db_total_size_in_use_in_bytes",
"etcd_mvcc_delete_total",
"etcd_mvcc_hash_duration_seconds_bucket",
"etcd_mvcc_hash_duration_seconds_count",
"etcd_mvcc_hash_duration_seconds_sum",
"etcd_mvcc_hash_rev_duration_seconds_bucket",
"etcd_mvcc_hash_rev_duration_seconds_count",
"etcd_mvcc_hash_rev_duration_seconds_sum",
"etcd_mvcc_put_total",
"etcd_mvcc_range_total",
"etcd_mvcc_txn_total",
"etcd_network_client_grpc_received_bytes_total",
"etcd_network_client_grpc_sent_bytes_total",
"etcd_network_known_peers",
"etcd_server_apply_duration_seconds_bucket",
"etcd_server_apply_duration_seconds_count",
"etcd_server_apply_duration_seconds_sum",
"etcd_server_client_requests_total",
"etcd_server_go_version",
"etcd_server_has_leader",
"etcd_server_health_failures",
"etcd_server_health_success",
"etcd_server_heartbeat_send_failures_total",
"etcd_server_id",
"etcd_server_is_leader",
"etcd_server_is_learner",
"etcd_server_leader_changes_seen_total",
"etcd_server_learner_promote_successes",
"etcd_server_proposals_applied_total",
"etcd_server_proposals_committed_total",
"etcd_server_proposals_failed_total",
"etcd_server_proposals_pending",
"etcd_server_quota_backend_bytes",
"etcd_server_read_indexes_failed_total",
"etcd_server_slow_apply_total",
"etcd_server_slow_read_indexes_total",
"etcd_server_snapshot_apply_in_progress_total",
"etcd_server_version",
"etcd_snap_db_fsync_duration_seconds_bucket",
"etcd_snap_db_fsync_duration_seconds_count",
"etcd_snap_db_fsync_duration_seconds_sum",
"etcd_snap_db_save_total_duration_seconds_bucket",
"etcd_snap_db_save_total_duration_seconds_count",
"etcd_snap_db_save_total_duration_seconds_sum",
"etcd_snap_fsync_duration_seconds_bucket",
"etcd_snap_fsync_duration_seconds_count",
"etcd_snap_fsync_duration_seconds_sum",
"grpc_client_handled_total",
"grpc_client_msg_received_total",
"grpc_client_msg_sent_total",
"grpc_client_started_total",
"grpc_server_handled_total",
"grpc_server_handling_seconds_bucket",
"grpc_server_handling_seconds_count",
"grpc_server_handling_seconds_sum",
"grpc_server_msg_received_total",
"grpc_server_msg_sent_total",
"grpc_server_started_total",
}

// Get the list of generated metrics
generatedMetrics := getMetricsList(t, url)
for _, metric := range expectedMetrics {
if !contains(generatedMetrics, metric) {
t.Errorf("Expected metric %s not found in generated metrics", metric)
}
}
}

func getMetricsList(t *testing.T, url string) []string {
lines := getHTTPBodyAsLines(t, url)
metrics := make(map[string]struct{})
for _, line := range lines {
if strings.Contains(line, "{") {
metric := line[:strings.Index(line, "{")]
metrics[metric] = struct{}{}
} else {
metric := line[:strings.Index(line, " ")]
metrics[metric] = struct{}{}
}
}
var metricList []string
for metric := range metrics {
metricList = append(metricList, metric)
}
return metricList
}

func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}

0 comments on commit f1e0118

Please sign in to comment.