Skip to content

Commit

Permalink
add reader-node-firehose
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell committed Sep 4, 2024
1 parent cc78521 commit 63c31e2
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 1 deletion.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
.envrc
.env
.DS_Store
firehose-data*
firehose-data*
/firecore
/firehose.yaml
74 changes: 74 additions & 0 deletions cmd/apps/reader_node_firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2021 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apps

import (
"github.com/spf13/cobra"
"github.com/spf13/viper"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/launcher"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
"github.com/streamingfast/firehose-core/node-manager/app/firehose_reader"
"github.com/streamingfast/firehose-core/node-manager/metrics"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) {
appLogger, appTracer := logging.PackageLogger("reader-node-firehose", chain.LoggerPackageID("reader-node-firehose"))

launcher.RegisterApp(rootLog, &launcher.AppDef{
ID: "reader-node-firehose",
Title: "Reader Node (Firehose)",
Description: "Blocks reading node, unmanaged, reads Firehose logs from standard input and transform them into Firehose chain specific blocks",
RegisterFlags: func(cmd *cobra.Command) error {
cmd.Flags().String("reader-node-firehose-endpoint", "", "Firehose endpoint to connect to.")
cmd.Flags().String("reader-node-firehose-state", "{data-dir}/reader/state", "State file to store the cursor from the Firehose connection in.")
cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.")
cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.")
cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.")

return nil
},
FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) {
sfDataDir := runtime.AbsDataDir
archiveStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url"))

metricID := "reader-node-firehose"
headBlockTimeDrift := metrics.NewHeadBlockTimeDrift(metricID)
headBlockNumber := metrics.NewHeadBlockNumber(metricID)
appReadiness := metrics.NewAppReadiness(metricID)
metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency"))

return firehose_reader.New(&firehose_reader.Config{
GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"),
OneBlocksStoreURL: archiveStoreURL,
MindReadBlocksChanCapacity: viper.GetInt("reader-node-blocks-chan-capacity"),
StartBlockNum: viper.GetUint64("reader-node-start-block-num"),
StopBlockNum: viper.GetUint64("reader-node-stop-block-num"),
WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")),
OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"),

FirehoseEndpoint: viper.GetString("reader-node-firehose-endpoint"),
FirehoseStateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")),
FirehoseInsecureConn: viper.GetBool("reader-node-firehose-insecure"),
FirehosePlaintextConn: viper.GetBool("reader-node-firehose-plaintext"),
FirehoseCompression: viper.GetString("reader-node-firehose-compression"),
}, &firehose_reader.Modules{
MetricsAndReadinessManager: metricsAndReadinessManager,
}, appLogger, appTracer), nil
},
})
}
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) {
registerCommonFlags(chain)
apps.RegisterReaderNodeApp(chain, rootLog)
apps.RegisterReaderNodeStdinApp(chain, rootLog)
apps.RegisterReaderNodeFirehoseApp(chain, rootLog)
apps.RegisterMergerApp(rootLog)
apps.RegisterRelayerApp(rootLog)
apps.RegisterFirehoseApp(chain, rootLog)
Expand Down
146 changes: 146 additions & 0 deletions node-manager/app/firehose_reader/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2019 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firehose_reader

import (
"fmt"
"github.com/streamingfast/bstream/blockstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
dgrpcserver "github.com/streamingfast/dgrpc/server"
dgrpcfactory "github.com/streamingfast/dgrpc/server/factory"
nodeManager "github.com/streamingfast/firehose-core/node-manager"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
"github.com/streamingfast/logging"
pbheadinfo "github.com/streamingfast/pbgo/sf/headinfo/v1"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type Config struct {
GRPCAddr string
OneBlocksStoreURL string
OneBlockSuffix string
MindReadBlocksChanCapacity int
StartBlockNum uint64
StopBlockNum uint64
WorkingDir string
LogToZap bool
DebugDeepMind bool

FirehoseEndpoint string
FirehoseStateFile string
FirehosePlaintextConn bool
FirehoseInsecureConn bool
FirehoseCompression string
}

type Modules struct {
MetricsAndReadinessManager *nodeManager.MetricsAndReadinessManager
RegisterGRPCService func(server grpc.ServiceRegistrar) error
}

type App struct {
*shutter.Shutter
Config *Config
ReadyFunc func()
modules *Modules
zlogger *zap.Logger
tracer logging.Tracer
}

func New(c *Config, modules *Modules, zlogger *zap.Logger, tracer logging.Tracer) *App {
n := &App{
Shutter: shutter.New(),
Config: c,
ReadyFunc: func() {},
modules: modules,
zlogger: zlogger,
tracer: tracer,
}
return n
}

func (a *App) Run() error {
a.zlogger.Info("launching reader-node-firehose app (reading from firehose)", zap.Reflect("config", a.Config))

gs := dgrpcfactory.ServerFromOptions(dgrpcserver.WithLogger(a.zlogger))

blockStreamServer := blockstream.NewUnmanagedServer(
blockstream.ServerOptionWithLogger(a.zlogger),
blockstream.ServerOptionWithBuffer(1),
)

firehoseReader, err := NewFirehoseReader(a.Config.FirehoseEndpoint, a.Config.FirehoseCompression, a.Config.FirehoseInsecureConn, a.Config.FirehosePlaintextConn, a.zlogger)
if err != nil {
return err
}

a.zlogger.Info("launching reader log plugin")
mindreaderLogPlugin, err := mindreader.NewMindReaderPlugin(
a.Config.OneBlocksStoreURL,
a.Config.WorkingDir,
firehoseReader.NoopConsoleReader,
a.Config.StartBlockNum,
a.Config.StopBlockNum,
a.Config.MindReadBlocksChanCapacity,
a.modules.MetricsAndReadinessManager.UpdateHeadBlock,
func(_ error) {},
a.Config.OneBlockSuffix,
blockStreamServer,
a.zlogger,
a.tracer,
)
if err != nil {
return err
}

a.zlogger.Debug("configuring shutter")
mindreaderLogPlugin.OnTerminated(a.Shutdown)
a.OnTerminating(mindreaderLogPlugin.Shutdown)

serviceRegistrar := gs.ServiceRegistrar()
pbheadinfo.RegisterHeadInfoServer(serviceRegistrar, blockStreamServer)
pbbstream.RegisterBlockStreamServer(serviceRegistrar, blockStreamServer)

if a.modules.RegisterGRPCService != nil {
err := a.modules.RegisterGRPCService(gs.ServiceRegistrar())
if err != nil {
return fmt.Errorf("register extra grpc service: %w", err)
}
}
gs.OnTerminated(a.Shutdown)
go gs.Launch(a.Config.GRPCAddr)

a.zlogger.Debug("launching firehose reader")
err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseStateFile)
if err != nil {
return err
}

a.zlogger.Debug("running reader log plugin")
mindreaderLogPlugin.Launch()
go a.modules.MetricsAndReadinessManager.Launch()

return nil
}

func (a *App) OnReady(f func()) {
a.ReadyFunc = f
}

func (a *App) IsReady() bool {
return true
}
114 changes: 114 additions & 0 deletions node-manager/app/firehose_reader/console_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package firehose_reader

import (
"context"
"errors"
"fmt"
"github.com/mostynb/go-grpc-compression/zstd"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/firehose-core/firehose/client"
"github.com/streamingfast/firehose-core/node-manager/mindreader"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
"os"
)

type FirehoseReader struct {
firehoseClient pbfirehose.StreamClient
firehoseStream pbfirehose.Stream_BlocksClient
closeFunc func() error
callOpts []grpc.CallOption
zlogger *zap.Logger
cursorStateFile string
}

func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) {
firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(endpoint, "", "", insecure, plaintext)
if err != nil {
return nil, err
}

switch compression {
case "gzip":
callOpts = append(callOpts, grpc.UseCompressor(gzip.Name))
case "zstd":
callOpts = append(callOpts, grpc.UseCompressor(zstd.Name))
case "none":
default:
return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", compression)
}

res := &FirehoseReader{
firehoseClient: firehoseClient,
closeFunc: closeFunc,
callOpts: callOpts,
zlogger: zlogger,
}

return res, nil
}

func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) error {

cursor, err := os.ReadFile(cursorFile)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("unable to read cursor file: %w", err)
}

if len(cursor) > 0 {
f.zlogger.Info("found cursor file, ignoring start block number", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile))
}

stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{
StartBlockNum: int64(startBlock),
Cursor: string(cursor),
StopBlockNum: stopBlock,
FinalBlocksOnly: false,
}, f.callOpts...)
if err != nil {
return fmt.Errorf("failed to request block stream from Firehose: %w", err)
}

f.firehoseStream = stream
f.cursorStateFile = cursorFile

return nil
}

func (f *FirehoseReader) NoopConsoleReader(_ chan string) (mindreader.ConsolerReader, error) {
return f, nil
}

func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) {

res, err := f.firehoseStream.Recv()
if err != nil {
return nil, err
}

err = os.WriteFile(f.cursorStateFile, []byte(res.Cursor), 0644)
if err != nil {
return nil, fmt.Errorf("failed to write cursor to state file: %w", err)
}

return &pbbstream.Block{
Number: res.Metadata.Num,
Id: res.Metadata.Id,
ParentId: res.Metadata.ParentId,
Timestamp: res.Metadata.Time,
LibNum: res.Metadata.LibNum,
ParentNum: res.Metadata.ParentNum,
Payload: res.Block,
}, nil
}

func (f *FirehoseReader) Done() <-chan interface{} {
//TODO implement me
panic("implement me")
}

func (f *FirehoseReader) Close() error {
return f.closeFunc()
}

0 comments on commit 63c31e2

Please sign in to comment.