diff --git a/CHANGELOG.md b/CHANGELOG.md index fd9eb68..886813a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -105,6 +105,8 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s * Added `--merger-delete-threads` to customize the number of threads the merger will use to delete files. It's recommended to increase this when using Ceph as S3 storage provider to 25 or higher (due to performance issues with deletes the merger might otherwise not be able to delete one-block files fast enough). * Added `--substreams-tier2-max-concurrent-requests` to limit the number of concurrent requests to the tier2 substreams service. +* If relayer is started with a single source, it will have reduced tolerance for missing blocks. This is to prevent the relayer from falling behind when the source is not producing blocks. + ## v1.2.5 * Fixed `tools check merged-blocks` default range when `-r ` is not provided to now be `[0, +∞]` (was previously `[HEAD, +∞]`). diff --git a/cmd/apps/relayer.go b/cmd/apps/relayer.go index 3ef9284..8fd235d 100644 --- a/cmd/apps/relayer.go +++ b/cmd/apps/relayer.go @@ -25,11 +25,15 @@ func RegisterRelayerApp(rootLog *zap.Logger) { FactoryFunc: func(runtime *launcher.Runtime) (launcher.App, error) { sfDataDir := runtime.AbsDataDir + sourcesAddr := viper.GetStringSlice("relayer-source") + singleReaderMode := len(sourcesAddr) <= 1 + return relayer.New(&relayer.Config{ - SourcesAddr: viper.GetStringSlice("relayer-source"), + SourcesAddr: sourcesAddr, OneBlocksURL: firecore.MustReplaceDataDir(sfDataDir, viper.GetString("common-one-block-store-url")), GRPCListenAddr: viper.GetString("relayer-grpc-listen-addr"), MaxSourceLatency: viper.GetDuration("relayer-max-source-latency"), + SingleReaderMode: singleReaderMode, }), nil }, }) diff --git a/relayer/app/relayer/app.go b/relayer/app/relayer/app.go index 00ee91f..6187775 100644 --- a/relayer/app/relayer/app.go +++ b/relayer/app/relayer/app.go @@ -37,6 +37,7 @@ type Config struct { SourceRequestBurst int MaxSourceLatency time.Duration OneBlocksURL string + SingleReaderMode bool } func (c *Config) ZapFields() []zap.Field { @@ -92,6 +93,7 @@ func (a *App) Run() error { liveSourceFactory, oneBlocksSourceFactory, a.config.GRPCListenAddr, + a.config.SingleReaderMode, ) a.OnTerminating(a.relayer.Shutdown) diff --git a/relayer/relayer.go b/relayer/relayer.go index d572e06..e5a12f7 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -53,6 +53,7 @@ func NewRelayer( liveSourceFactory bstream.SourceFactory, oneBlocksSourceFactory bstream.SourceFromNumFactoryWithSkipFunc, grpcListenAddr string, + singleReaderMode bool, ) *Relayer { r := &Relayer{ Shutter: shutter.New(), @@ -64,14 +65,24 @@ func NewRelayer( gs := dgrpcfactory.ServerFromOptions() pbhealth.RegisterHealthServer(gs.ServiceRegistrar(), r) + options := []forkable.Option{ + forkable.EnsureAllBlocksTriggerLongestChain(), // send every forked block too + forkable.WithFilters(bstream.StepNew), + } + + if singleReaderMode { + options = append(options, forkable.WithFailOnUnlinkableBlocks(1, 10*time.Second)) + } else { + options = append(options, forkable.WithFailOnUnlinkableBlocks(20, time.Minute)) + } + forkableHub := hub.NewForkableHub( r.liveSourceFactory, r.oneBlocksSourceFactory, 10, - forkable.EnsureAllBlocksTriggerLongestChain(), // send every forked block too - forkable.WithFilters(bstream.StepNew), - forkable.WithFailOnUnlinkableBlocks(20, time.Minute), + options..., ) + r.hub = forkableHub gs.OnTerminated(r.Shutdown) r.blockStreamServer = r.hub.NewBlockstreamServer(gs)