From 52fa732a74bdeec30225411e910945e12eedebc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Tue, 30 Jul 2024 12:16:25 +0200 Subject: [PATCH] print stats --- .../app/firehose_reader/console_reader.go | 11 ++++ node-manager/app/firehose_reader/metrics.go | 11 ++++ .../app/firehose_reader/reader_stats.go | 55 +++++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 node-manager/app/firehose_reader/metrics.go create mode 100644 node-manager/app/firehose_reader/reader_stats.go diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go index 1b69dce..514c4b0 100644 --- a/node-manager/app/firehose_reader/console_reader.go +++ b/node-manager/app/firehose_reader/console_reader.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/encoding/gzip" "os" + "time" ) type FirehoseReader struct { @@ -22,6 +23,7 @@ type FirehoseReader struct { callOpts []grpc.CallOption zlogger *zap.Logger cursorStateFile string + stats *firehoseReaderStats } func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) { @@ -45,6 +47,7 @@ func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, z closeFunc: closeFunc, callOpts: callOpts, zlogger: zlogger, + stats: newFirehoseReaderStats(), } return res, nil @@ -73,6 +76,7 @@ func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) f.firehoseStream = stream f.cursorStateFile = cursorFile + f.stats.StartPeriodicLogToZap(context.Background(), f.zlogger, 10*time.Second) return nil } @@ -93,6 +97,12 @@ func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { return nil, fmt.Errorf("failed to write cursor to state file: %w", err) } + BlockReadCount.Inc() + f.stats.lastBlock = pbbstream.BlockRef{ + Num: res.Metadata.Num, + Id: res.Metadata.Id, + } + return &pbbstream.Block{ Number: res.Metadata.Num, Id: res.Metadata.Id, @@ -110,5 +120,6 @@ func (f *FirehoseReader) Done() <-chan interface{} { } func (f *FirehoseReader) Close() error { + f.stats.StopPeriodicLogToZap() return f.closeFunc() } diff --git a/node-manager/app/firehose_reader/metrics.go b/node-manager/app/firehose_reader/metrics.go new file mode 100644 index 0000000..68677cf --- /dev/null +++ b/node-manager/app/firehose_reader/metrics.go @@ -0,0 +1,11 @@ +package firehose_reader + +import "github.com/streamingfast/dmetrics" + +var metrics = dmetrics.NewSet(dmetrics.PrefixNameWith("reader_node_firehose")) + +func init() { + metrics.Register() +} + +var BlockReadCount = metrics.NewCounter("block_read_count", "The number of blocks read by the Firehose reader") diff --git a/node-manager/app/firehose_reader/reader_stats.go b/node-manager/app/firehose_reader/reader_stats.go new file mode 100644 index 0000000..2df596f --- /dev/null +++ b/node-manager/app/firehose_reader/reader_stats.go @@ -0,0 +1,55 @@ +package firehose_reader + +import ( + "context" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/dmetrics" + "go.uber.org/zap" + "time" +) + +type firehoseReaderStats struct { + lastBlock pbbstream.BlockRef + blockRate *dmetrics.AvgRatePromCounter + + cancelPeriodicLogger context.CancelFunc +} + +func newFirehoseReaderStats() *firehoseReaderStats { + return &firehoseReaderStats{ + lastBlock: pbbstream.BlockRef{}, + blockRate: dmetrics.MustNewAvgRateFromPromCounter(BlockReadCount, 1*time.Second, 30*time.Second, "blocks"), + } +} + +func (s *firehoseReaderStats) StartPeriodicLogToZap(ctx context.Context, logger *zap.Logger, logEach time.Duration) { + ctx, s.cancelPeriodicLogger = context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(logEach) + for { + select { + case <-ticker.C: + logger.Info("reader node statistics", s.ZapFields()...) + case <-ctx.Done(): + return + } + } + }() +} + +func (s *firehoseReaderStats) StopPeriodicLogToZap() { + if s.cancelPeriodicLogger != nil { + s.cancelPeriodicLogger() + } +} + +func (s *firehoseReaderStats) ZapFields() []zap.Field { + fields := []zap.Field{ + zap.Stringer("block_rate", s.blockRate), + zap.Uint64("last_block_num", s.lastBlock.Num), + zap.String("last_block_id", s.lastBlock.Id), + } + + return fields +}