diff --git a/.gitignore b/.gitignore index c6d0140..44db5e9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ .envrc .env .DS_Store -firehose-data* \ No newline at end of file +firehose-data* +/firecore +/firehose.yaml diff --git a/cmd/apps/reader_node_firehose.go b/cmd/apps/reader_node_firehose.go new file mode 100644 index 0000000..42a0241 --- /dev/null +++ b/cmd/apps/reader_node_firehose.go @@ -0,0 +1,74 @@ +// Copyright 2021 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apps + +import ( + "github.com/spf13/cobra" + "github.com/spf13/viper" + firecore "github.com/streamingfast/firehose-core" + "github.com/streamingfast/firehose-core/launcher" + nodeManager "github.com/streamingfast/firehose-core/node-manager" + "github.com/streamingfast/firehose-core/node-manager/app/firehose_reader" + "github.com/streamingfast/firehose-core/node-manager/metrics" + "github.com/streamingfast/logging" + "go.uber.org/zap" +) + +func RegisterReaderNodeFirehoseApp[B firecore.Block](chain *firecore.Chain[B], rootLog *zap.Logger) { + appLogger, appTracer := logging.PackageLogger("reader-node-firehose", chain.LoggerPackageID("reader-node-firehose")) + + launcher.RegisterApp(rootLog, &launcher.AppDef{ + ID: "reader-node-firehose", + Title: "Reader Node (Firehose)", + Description: "Blocks reading node, unmanaged, reads Firehose logs from standard input and transform them into Firehose chain specific blocks", + RegisterFlags: func(cmd *cobra.Command) error { + cmd.Flags().String("reader-node-firehose-endpoint", "", "Firehose endpoint to connect to.") + cmd.Flags().String("reader-node-firehose-state", "{data-dir}/reader/state", "State file to store the cursor from the Firehose connection in.") + cmd.Flags().String("reader-node-firehose-compression", "zstd", "Firehose compression, one of 'gzip', 'zstd' or 'none'.") + cmd.Flags().Bool("reader-node-firehose-insecure", false, "Skip TLS validation when connecting to a Firehose endpoint.") + cmd.Flags().Bool("reader-node-firehose-plaintext", false, "Connect to a Firehose endpoint using a non-encrypted, plaintext connection.") + + return nil + }, + FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { + sfDataDir := runtime.AbsDataDir + archiveStoreURL := firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")) + + metricID := "reader-node-firehose" + headBlockTimeDrift := metrics.NewHeadBlockTimeDrift(metricID) + headBlockNumber := metrics.NewHeadBlockNumber(metricID) + appReadiness := metrics.NewAppReadiness(metricID) + metricsAndReadinessManager := nodeManager.NewMetricsAndReadinessManager(headBlockTimeDrift, headBlockNumber, appReadiness, viper.GetDuration("reader-node-readiness-max-latency")) + + return firehose_reader.New(&firehose_reader.Config{ + GRPCAddr: viper.GetString("reader-node-grpc-listen-addr"), + OneBlocksStoreURL: archiveStoreURL, + MindReadBlocksChanCapacity: viper.GetInt("reader-node-blocks-chan-capacity"), + StartBlockNum: viper.GetUint64("reader-node-start-block-num"), + StopBlockNum: viper.GetUint64("reader-node-stop-block-num"), + WorkingDir: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-working-dir")), + OneBlockSuffix: viper.GetString("reader-node-one-block-suffix"), + + FirehoseEndpoint: viper.GetString("reader-node-firehose-endpoint"), + FirehoseStateFile: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("reader-node-firehose-state")), + FirehoseInsecureConn: viper.GetBool("reader-node-firehose-insecure"), + FirehosePlaintextConn: viper.GetBool("reader-node-firehose-plaintext"), + FirehoseCompression: viper.GetString("reader-node-firehose-compression"), + }, &firehose_reader.Modules{ + MetricsAndReadinessManager: metricsAndReadinessManager, + }, appLogger, appTracer), nil + }, + }) +} diff --git a/cmd/main.go b/cmd/main.go index d0774a0..1fd3953 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -101,6 +101,7 @@ func Main[B firecore.Block](chain *firecore.Chain[B]) { registerCommonFlags(chain) apps.RegisterReaderNodeApp(chain, rootLog) apps.RegisterReaderNodeStdinApp(chain, rootLog) + apps.RegisterReaderNodeFirehoseApp(chain, rootLog) apps.RegisterMergerApp(rootLog) apps.RegisterRelayerApp(rootLog) apps.RegisterFirehoseApp(chain, rootLog) diff --git a/node-manager/app/firehose_reader/app.go b/node-manager/app/firehose_reader/app.go new file mode 100644 index 0000000..fe43509 --- /dev/null +++ b/node-manager/app/firehose_reader/app.go @@ -0,0 +1,146 @@ +// Copyright 2019 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firehose_reader + +import ( + "fmt" + "github.com/streamingfast/bstream/blockstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + dgrpcserver "github.com/streamingfast/dgrpc/server" + dgrpcfactory "github.com/streamingfast/dgrpc/server/factory" + nodeManager "github.com/streamingfast/firehose-core/node-manager" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + "github.com/streamingfast/logging" + pbheadinfo "github.com/streamingfast/pbgo/sf/headinfo/v1" + "github.com/streamingfast/shutter" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type Config struct { + GRPCAddr string + OneBlocksStoreURL string + OneBlockSuffix string + MindReadBlocksChanCapacity int + StartBlockNum uint64 + StopBlockNum uint64 + WorkingDir string + LogToZap bool + DebugDeepMind bool + + FirehoseEndpoint string + FirehoseStateFile string + FirehosePlaintextConn bool + FirehoseInsecureConn bool + FirehoseCompression string +} + +type Modules struct { + MetricsAndReadinessManager *nodeManager.MetricsAndReadinessManager + RegisterGRPCService func(server grpc.ServiceRegistrar) error +} + +type App struct { + *shutter.Shutter + Config *Config + ReadyFunc func() + modules *Modules + zlogger *zap.Logger + tracer logging.Tracer +} + +func New(c *Config, modules *Modules, zlogger *zap.Logger, tracer logging.Tracer) *App { + n := &App{ + Shutter: shutter.New(), + Config: c, + ReadyFunc: func() {}, + modules: modules, + zlogger: zlogger, + tracer: tracer, + } + return n +} + +func (a *App) Run() error { + a.zlogger.Info("launching reader-node-firehose app (reading from firehose)", zap.Reflect("config", a.Config)) + + gs := dgrpcfactory.ServerFromOptions(dgrpcserver.WithLogger(a.zlogger)) + + blockStreamServer := blockstream.NewUnmanagedServer( + blockstream.ServerOptionWithLogger(a.zlogger), + blockstream.ServerOptionWithBuffer(1), + ) + + firehoseReader, err := NewFirehoseReader(a.Config.FirehoseEndpoint, a.Config.FirehoseCompression, a.Config.FirehoseInsecureConn, a.Config.FirehosePlaintextConn, a.zlogger) + if err != nil { + return err + } + + a.zlogger.Info("launching reader log plugin") + mindreaderLogPlugin, err := mindreader.NewMindReaderPlugin( + a.Config.OneBlocksStoreURL, + a.Config.WorkingDir, + firehoseReader.NoopConsoleReader, + a.Config.StartBlockNum, + a.Config.StopBlockNum, + a.Config.MindReadBlocksChanCapacity, + a.modules.MetricsAndReadinessManager.UpdateHeadBlock, + func(_ error) {}, + a.Config.OneBlockSuffix, + blockStreamServer, + a.zlogger, + a.tracer, + ) + if err != nil { + return err + } + + a.zlogger.Debug("configuring shutter") + mindreaderLogPlugin.OnTerminated(a.Shutdown) + a.OnTerminating(mindreaderLogPlugin.Shutdown) + + serviceRegistrar := gs.ServiceRegistrar() + pbheadinfo.RegisterHeadInfoServer(serviceRegistrar, blockStreamServer) + pbbstream.RegisterBlockStreamServer(serviceRegistrar, blockStreamServer) + + if a.modules.RegisterGRPCService != nil { + err := a.modules.RegisterGRPCService(gs.ServiceRegistrar()) + if err != nil { + return fmt.Errorf("register extra grpc service: %w", err) + } + } + gs.OnTerminated(a.Shutdown) + go gs.Launch(a.Config.GRPCAddr) + + a.zlogger.Debug("launching firehose reader") + err = firehoseReader.Launch(a.Config.StartBlockNum, a.Config.StopBlockNum, a.Config.FirehoseStateFile) + if err != nil { + return err + } + + a.zlogger.Debug("running reader log plugin") + mindreaderLogPlugin.Launch() + go a.modules.MetricsAndReadinessManager.Launch() + + return nil +} + +func (a *App) OnReady(f func()) { + a.ReadyFunc = f +} + +func (a *App) IsReady() bool { + return true +} diff --git a/node-manager/app/firehose_reader/console_reader.go b/node-manager/app/firehose_reader/console_reader.go new file mode 100644 index 0000000..1b69dce --- /dev/null +++ b/node-manager/app/firehose_reader/console_reader.go @@ -0,0 +1,114 @@ +package firehose_reader + +import ( + "context" + "errors" + "fmt" + "github.com/mostynb/go-grpc-compression/zstd" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "github.com/streamingfast/firehose-core/firehose/client" + "github.com/streamingfast/firehose-core/node-manager/mindreader" + pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/encoding/gzip" + "os" +) + +type FirehoseReader struct { + firehoseClient pbfirehose.StreamClient + firehoseStream pbfirehose.Stream_BlocksClient + closeFunc func() error + callOpts []grpc.CallOption + zlogger *zap.Logger + cursorStateFile string +} + +func NewFirehoseReader(endpoint, compression string, insecure, plaintext bool, zlogger *zap.Logger) (*FirehoseReader, error) { + firehoseClient, closeFunc, callOpts, err := client.NewFirehoseClient(endpoint, "", "", insecure, plaintext) + if err != nil { + return nil, err + } + + switch compression { + case "gzip": + callOpts = append(callOpts, grpc.UseCompressor(gzip.Name)) + case "zstd": + callOpts = append(callOpts, grpc.UseCompressor(zstd.Name)) + case "none": + default: + return nil, fmt.Errorf("invalid compression: %q, must be one of 'gzip', 'zstd' or 'none'", compression) + } + + res := &FirehoseReader{ + firehoseClient: firehoseClient, + closeFunc: closeFunc, + callOpts: callOpts, + zlogger: zlogger, + } + + return res, nil +} + +func (f *FirehoseReader) Launch(startBlock, stopBlock uint64, cursorFile string) error { + + cursor, err := os.ReadFile(cursorFile) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return fmt.Errorf("unable to read cursor file: %w", err) + } + + if len(cursor) > 0 { + f.zlogger.Info("found cursor file, ignoring start block number", zap.String("cursor", string(cursor)), zap.String("state_file", cursorFile)) + } + + stream, err := f.firehoseClient.Blocks(context.Background(), &pbfirehose.Request{ + StartBlockNum: int64(startBlock), + Cursor: string(cursor), + StopBlockNum: stopBlock, + FinalBlocksOnly: false, + }, f.callOpts...) + if err != nil { + return fmt.Errorf("failed to request block stream from Firehose: %w", err) + } + + f.firehoseStream = stream + f.cursorStateFile = cursorFile + + return nil +} + +func (f *FirehoseReader) NoopConsoleReader(_ chan string) (mindreader.ConsolerReader, error) { + return f, nil +} + +func (f *FirehoseReader) ReadBlock() (obj *pbbstream.Block, err error) { + + res, err := f.firehoseStream.Recv() + if err != nil { + return nil, err + } + + err = os.WriteFile(f.cursorStateFile, []byte(res.Cursor), 0644) + if err != nil { + return nil, fmt.Errorf("failed to write cursor to state file: %w", err) + } + + return &pbbstream.Block{ + Number: res.Metadata.Num, + Id: res.Metadata.Id, + ParentId: res.Metadata.ParentId, + Timestamp: res.Metadata.Time, + LibNum: res.Metadata.LibNum, + ParentNum: res.Metadata.ParentNum, + Payload: res.Block, + }, nil +} + +func (f *FirehoseReader) Done() <-chan interface{} { + //TODO implement me + panic("implement me") +} + +func (f *FirehoseReader) Close() error { + return f.closeFunc() +}