Skip to content

Commit

Permalink
feat: add support for prometheus pushgateway (#50)
Browse files Browse the repository at this point in the history
To enhance this service's monitoring capabilities, we (Nethermind) found
it helpful to have a Pushgateway configuration that sends metrics each
time the service is executed. The Pushgateway approach is a good
solution for this scenario where the target service is not always
running, and it aligns perfectly with the scenario explained
[here](https://prometheus.io/docs/practices/pushing/).
  • Loading branch information
seanmcgary authored Aug 12, 2024
2 parents b1b9eaf + f74843c commit 62c132f
Show file tree
Hide file tree
Showing 9 changed files with 613 additions and 12 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,18 @@ EnvVar: `EIGENLAYER_ENABLE_TRACING`

Enable/disable tracing. Defaults to `true` and will attempt to auto-detect the DataDog tracing agent from the environment.

### `--pushgateway-enabled`

EnvVar: `EIGENLAYER_PUSHGATEWAY_ENABLED`

Enable/disable pushgateway metrics collection. Defaults to `false`.

### `--pushgateway-url`

EnvVar: `EIGENLAYER_PUSHGATEWAY_URL`

The URL of the pushgateway to send metrics to.

## Updater

### `--environment`
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func init() {
rootCmd.PersistentFlags().String("private-key", "", "An ethereum private key")
rootCmd.PersistentFlags().String("rewards-coordinator-address", "0x56c119bD92Af45eb74443ab14D4e93B7f5C67896", "Ethereum address of the rewards coordinator contract")
rootCmd.PersistentFlags().String("proof-store-base-url", "", "HTTP base url where data is stored")
rootCmd.PersistentFlags().Bool("pushgateway-enabled", false, "Enable/disable pushgateway metrics collection")
rootCmd.PersistentFlags().String("pushgateway-url", "", "URL to use for Pushgateway. This option is ignored if pushgateway-enable is not set.")

rootCmd.PersistentFlags().VisitAll(func(f *pflag.Flag) {
viper.BindPFlag(config.KebabToSnakeCase(f.Name), f)
Expand Down
19 changes: 17 additions & 2 deletions cmd/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"fmt"
"log"
"net/http"

"github.com/Layr-Labs/eigenlayer-rewards-updater/internal/logger"
"github.com/Layr-Labs/eigenlayer-rewards-updater/internal/metrics"
"github.com/Layr-Labs/eigenlayer-rewards-updater/pkg/chainClient"
Expand All @@ -18,8 +21,6 @@ import (
"github.com/spf13/viper"
"go.uber.org/zap"
ddTracer "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
"log"
"net/http"
)

func runUpdater(ctx context.Context, cfg *config.UpdaterConfig, logger *zap.Logger) error {
Expand Down Expand Up @@ -87,6 +88,16 @@ var updaterCmd = &cobra.Command{

s.Incr(metrics.Counter_UpdateRuns, nil, 1)

if cfg.PushgatewayEnabled {
// Init Pushgateway prometheusPusher client
err = metrics.InitPrometheusPusherClient(cfg.PushgatewayUrl, "updater")
if err != nil {
log.Fatalln(err)
}
}

metrics.IncCounterUpdateRun(metrics.CounterUpdateRunsInvoked)

logger, err := logger.NewLogger(&logger.LoggerConfig{
Debug: cfg.Debug,
})
Expand All @@ -102,6 +113,10 @@ var updaterCmd = &cobra.Command{
if err := s.Close(); err != nil {
logger.Sugar().Errorw("Failed to close statsd client", zap.Error(err))
}
// Push metrics to pushgateway at the end of the run
if err := metrics.PushToPushgateway(); err != nil {
logger.Sugar().Errorw("Failed to push metrics to pushgateway", zap.Error(err))
}
},
}

Expand Down
15 changes: 15 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ services:
image: eigenlayer-rewards-updater:latest
build:
dockerfile: Dockerfile
networks:
- eigenlabs-rewards-updater
command:
- "updater"
- "--debug=true"
Expand All @@ -14,3 +16,16 @@ services:
- "--proof-store-base-url=https://eigenlabs-rewards-testnet-holesky.s3.amazonaws.com"
- "--private-key=<ethereum private key (hex string, not prefixed with 0x)>"
- "--rewards-coordinator-address=0xAcc1fb458a1317E886dB376Fc8141540537E68fE"
- "--pushgateway-enabled=true"
- "--pushgateway-url=http://pushgateway:9091"
pushgateway:
image: prom/pushgateway:latest
networks:
- eigenlabs-rewards-updater
expose:
- "9091"
ports:
- "9091:9091"

networks:
eigenlabs-rewards-updater:
9 changes: 9 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/ethereum/go-ethereum v1.14.0
github.com/google/go-cmp v0.6.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.0
github.com/rs/zerolog v1.31.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand All @@ -16,6 +17,14 @@ require (
gopkg.in/DataDog/dd-trace-go.v1 v1.64.0
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
)

require (
github.com/DataDog/appsec-internal-go v1.5.0 // indirect
github.com/DataDog/datadog-agent/pkg/obfuscate v0.48.0 // indirect
Expand Down
414 changes: 414 additions & 0 deletions go.sum

Large diffs are not rendered by default.

126 changes: 126 additions & 0 deletions internal/metrics/pushgateway.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package metrics

import (
"fmt"
"io"
"net/http"
"regexp"
"strconv"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)

var (
prometheusPusherClient *PrometheusPusherClient
)

const (
CounterUpdateRunsInvoked = "invoked"
CounterUpdateRunsFailed = "failed"
CounterUpdateRunsSuccess = "success"
CounterUpdateRunsNoUpdate = "no-update"
)

// PrometheusPusherClient is a pusher for the metrics to the pushgateway.
type PrometheusPusherClient struct {
pushgatewayAddr string
pusher *push.Pusher
// Metrics
counterUpdateRuns *prometheus.CounterVec
}

// InitPrometheusPusherClient creates a new PrometheusPusherClient. The client will
// try to fetch the current metrics values from the pushgateway and initialize it
// with them. This is useful to maintain a good history of the metrics. If the metric
// is not found, it will be initialized with 0.
func InitPrometheusPusherClient(addr string, jobName string) error {
prometheusPusherClient = &PrometheusPusherClient{
pushgatewayAddr: addr,
pusher: push.New(addr, jobName),
counterUpdateRuns: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "counter_update_run_total",
}, []string{"status"}),
}
prometheusPusherClient.pusher.Collector(prometheusPusherClient.counterUpdateRuns)
return prometheusPusherClient.initCounterMetrics()
}

// IncCounterUpdateRun increments the counter_update_runs metric with the given status.
// If the Pushgateway client is not initialized, the call is ignored. To avoid ignoring
// make sure to call the InitPrometheusPusherClient function before using this function.
// The status can be one of the following: "invoked", "failed", "success", "no-update"
func IncCounterUpdateRun(status string) {
if prometheusPusherClient == nil {
return
}
prometheusPusherClient.incCounterUpdateRun(status)
}

// PushToPushgateway pushes the metrics to the pushgateway.
// If the Pushgateway client is not initialized, the call is ignored. To avoid ignoring
// make sure to call the InitPrometheusPusherClient function before using this function.
func PushToPushgateway() error {
if prometheusPusherClient == nil {
return nil
}
return prometheusPusherClient.push()
}

func (p *PrometheusPusherClient) incCounterUpdateRun(status string) {
p.counterUpdateRuns.WithLabelValues(status).Inc()
}

func (p *PrometheusPusherClient) push() error {
return p.pusher.Push()
}

func (p *PrometheusPusherClient) initCounterMetrics() error {
resp, err := http.Get(p.pushgatewayAddr + "/metrics")
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to fetch metrics from pushgateway: %s", resp.Status)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}

bodyString := string(body)
initializers := []struct {
regex *regexp.Regexp
counter prometheus.Counter
}{
{
regex: regexp.MustCompile(`counter_update_run_total{.*\bstatus="failed"} (\d+)`),
counter: p.counterUpdateRuns.WithLabelValues(CounterUpdateRunsFailed),
},
{
regex: regexp.MustCompile(`counter_update_run_total{.*\bstatus="success"} (\d+)`),
counter: p.counterUpdateRuns.WithLabelValues(CounterUpdateRunsSuccess),
},
{
regex: regexp.MustCompile(`counter_update_run_total{.*\bstatus="invoked"} (\d+)`),
counter: p.counterUpdateRuns.WithLabelValues(CounterUpdateRunsInvoked),
},
{
regex: regexp.MustCompile(`counter_update_run_total{.*\bstatus="no-update"} (\d+)`),
counter: p.counterUpdateRuns.WithLabelValues(CounterUpdateRunsNoUpdate),
},
}
for _, initializer := range initializers {
matches := initializer.regex.FindStringSubmatch(bodyString)
if len(matches) > 0 {
value, err := strconv.ParseFloat(matches[1], 64)
if err != nil {
return err
}
initializer.counter.Add(value)
}
}

return nil
}
25 changes: 15 additions & 10 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package config

import (
"fmt"
"strings"

"github.com/pkg/errors"
"github.com/spf13/viper"
"strings"
)

type GlobalConfig struct {
Config string `mapstructure:"config"`
Debug bool `mapstructure:"debug"`
DDStatsdUrl string `mapstructure:"dd_statsd_url"`
EnableStatsd bool `mapstructure:"enable_statsd"`
EnableTracing bool `mapstructure:"enable_tracing"`
Config string `mapstructure:"config"`
Debug bool `mapstructure:"debug"`
DDStatsdUrl string `mapstructure:"dd_statsd_url"`
EnableStatsd bool `mapstructure:"enable_statsd"`
PushgatewayUrl string `mapstructure:"pushgateway_url"`
PushgatewayEnabled bool `mapstructure:"pushgateway_enabled"`
EnableTracing bool `mapstructure:"enable_tracing"`
}

type Environment int
Expand Down Expand Up @@ -102,10 +105,12 @@ func StringEnvironmentFromEnum(env Environment) (string, error) {

func GetGlobalConfig() GlobalConfig {
return GlobalConfig{
Config: viper.GetString("config"),
Debug: viper.GetBool("debug"),
DDStatsdUrl: viper.GetString("dd_statsd_url"),
EnableTracing: viper.GetBool("enable_tracing"),
Config: viper.GetString("config"),
Debug: viper.GetBool("debug"),
DDStatsdUrl: viper.GetString("dd_statsd_url"),
EnableTracing: viper.GetBool("enable_tracing"),
PushgatewayEnabled: viper.GetBool("pushgateway_enabled"),
PushgatewayUrl: viper.GetString("pushgateway_url"),
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (u *Updater) Update(ctx context.Context) (*merkletree.MerkleTree, error) {
// If most recent snapshot's timestamp is equal to the latest submitted timestamp, then we don't need to update
if lst.Equal(latestSnapshot.SnapshotDate) {
metrics.GetStatsdClient().Incr(metrics.Counter_UpdateNoUpdate, nil, 1)
metrics.IncCounterUpdateRun(metrics.CounterUpdateRunsNoUpdate)
u.logger.Sugar().Infow("latest snapshot is the most recent reward")
return nil, nil
}
Expand All @@ -87,10 +88,12 @@ func (u *Updater) Update(ctx context.Context) (*merkletree.MerkleTree, error) {
u.logger.Sugar().Infow("Calculated timestamp", zap.Int64("calculated_until_timestamp", calculatedUntilTimestamp))
if err := u.transactor.SubmitRoot(ctx, [32]byte(newRoot), uint32(calculatedUntilTimestamp)); err != nil {
metrics.GetStatsdClient().Incr(metrics.Counter_UpdateFails, nil, 1)
metrics.IncCounterUpdateRun(metrics.CounterUpdateRunsFailed)
u.logger.Sugar().Errorw("Failed to submit root", zap.Error(err))
return rewardsProofData.AccountTree, err
} else {
metrics.GetStatsdClient().Incr(metrics.Counter_UpdateSuccess, nil, 1)
metrics.IncCounterUpdateRun(metrics.CounterUpdateRunsSuccess)
}

return rewardsProofData.AccountTree, nil
Expand Down

0 comments on commit 62c132f

Please sign in to comment.