diff --git a/CHANGELOG.md b/CHANGELOG.md index 4090aa8..b428c37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ Operators, you should copy/paste content of this content straight to your projec If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary. +## Unreleased + +* Enhanced `firecore tools print merged-blocks` with various small quality of life improvements: + - Now accepts a block range instead of a single start block. + - Passing a single block as the block range will print this single block alone. + - Block range is now optional, defaulting to run until there is no more files to read. + - It's possible to pass a merged blocks file directly, with or without an optional range. + ## v1.6.9 ### Substreams @@ -19,7 +27,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s ## v1.6.8 -> [!NOTE] +> [!NOTE] > This release will reject connections from clients that don't support GZIP compression. Use `--substreams-tier1-enforce-compression=false` to keep previous behavior, then check the logs for `incoming Substreams Blocks request` logs with the value `compressed: false` to track users who are not using compressed HTTP connections. * Substreams: add `--substreams-tier1-enforce-compression` to reject connections from clients that do not support GZIP compression diff --git a/cmd/tools/print/tools_print.go b/cmd/tools/print/tools_print.go index 9afd709..8bbf304 100644 --- a/cmd/tools/print/tools_print.go +++ b/cmd/tools/print/tools_print.go @@ -15,10 +15,15 @@ package print import ( + "errors" "fmt" "io" "os" + "path" + "path/filepath" + "regexp" "strconv" + "strings" "github.com/spf13/cobra" "github.com/streamingfast/bstream" @@ -27,9 +32,10 @@ import ( "github.com/streamingfast/dstore" firecore "github.com/streamingfast/firehose-core" "github.com/streamingfast/firehose-core/types" + "go.uber.org/zap" ) -func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command { +func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) *cobra.Command { toolsPrintCmd := &cobra.Command{ Use: "print", Short: "Prints of one block or merged blocks file", @@ -42,9 +48,9 @@ func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command } toolsPrintMergedBlocksCmd := &cobra.Command{ - Use: "merged-blocks ", - Short: "Prints the content summary of a merged blocks file.", - Args: cobra.ExactArgs(2), + Use: "merged-blocks [[:]]", + Short: "Prints merged blocks file from store, using range if specified", + Args: cobra.RangeArgs(1, 2), } toolsPrintCmd.AddCommand(toolsPrintOneBlockCmd) @@ -53,65 +59,100 @@ func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary") toolsPrintOneBlockCmd.RunE = createToolsPrintOneBlockE(chain) - toolsPrintMergedBlocksCmd.RunE = createToolsPrintMergedBlocksE(chain) + toolsPrintMergedBlocksCmd.RunE = createToolsPrintMergedBlocksE(chain, logger) return toolsPrintCmd } -func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.CommandExecutor { +func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) firecore.CommandExecutor { return func(cmd *cobra.Command, args []string) error { - ctx := cmd.Context() - outputPrinter, err := GetOutputPrinter(cmd, chain.BlockFileDescriptor()) cli.NoError(err, "Unable to get output printer") - storeURL := args[0] - store, err := dstore.NewDBinStore(storeURL) - if err != nil { - return fmt.Errorf("unable to create store at path %q: %w", store, err) - } + store, err := dstore.NewDBinStore(args[0]) + cli.NoError(err, "Unable to create store %q", args[0]) - startBlock, err := strconv.ParseUint(args[1], 10, 64) - if err != nil { - return fmt.Errorf("invalid base block %q: %w", args[1], err) + blockRange := types.NewOpenRange(int64(bstream.GetProtocolFirstStreamableBlock)) + if len(args) > 1 { + blockRange, err = types.GetBlockRangeFromArg(args[1]) + cli.NoError(err, "Unable to parse block range %q", args[1]) + + // If the range is open, we assume the user wants to print a single block + if blockRange.IsOpen() { + blockRange = types.NewClosedRange(blockRange.GetStartBlock(), uint64(blockRange.GetStartBlock())+1) + } + + cli.Ensure(blockRange.IsResolved(), "Invalid block range %q: print only accepts fully resolved range", blockRange) } - blockBoundary := types.RoundToBundleStartBlock(startBlock, 100) - filename := fmt.Sprintf("%010d", blockBoundary) - reader, err := store.OpenObject(ctx, filename) - if err != nil { - fmt.Printf("❌ Unable to read blocks filename %s: %s\n", filename, err) - return err + if looksLikeStoreURLFile(args[0]) { + store, err = dstore.NewDBinStore(storeURLFromFileInput(args[0])) + cli.NoError(err, "Unable to create store %q", path.Dir(args[0])) + + storeFileRange := storeURLFileLikeRange(args[0]) + if len(args) <= 1 { + // No block range explicitly specified, we will use the range from the filename + blockRange = storeFileRange + } else { + cli.Ensure(storeFileRange.Contains(uint64(blockRange.GetStartBlock()), types.EndBoundaryExclusive), "Block range %q is not contained in the store file range %q", blockRange, storeFileRange) + cli.Ensure(storeFileRange.Contains(uint64(blockRange.MustGetStopBlock()), types.EndBoundaryExclusive), "Block range %q is not contained in the store file range %q", blockRange, storeFileRange) + } } - defer reader.Close() - readerFactory, err := bstream.NewDBinBlockReader(reader) - if err != nil { - fmt.Printf("❌ Unable to read blocks filename %s: %s\n", filename, err) - return err + options := []bstream.FileSourceOption{ + bstream.FileSourceErrorOnMissingMergedBlocksFile(), + } + if !blockRange.IsOpen() { + options = append(options, bstream.FileSourceWithStopBlock(blockRange.MustGetStopBlock())) } - seenBlockCount := 0 - for { - block, err := readerFactory.Read() - if err != nil { - if err == io.EOF { - fmt.Fprintf(os.Stderr, "Total blocks: %d\n", seenBlockCount) - return nil - } - return fmt.Errorf("error receiving blocks: %w", err) + source := bstream.NewFileSource(store, uint64(blockRange.GetStartBlock()), bstream.HandlerFunc(func(blk *pbbstream.Block, obj any) error { + if !blockRange.Contains(blk.Number, types.RangeBoundaryExclusive) { + return nil } - seenBlockCount++ + return displayBlock(blk, chain, outputPrinter) + }), logger, options...) - if err := displayBlock(block, chain, outputPrinter); err != nil { - // Error is ready to be passed to the user as-is - return err - } + // Blocking call, perform the work we asked for + source.Run() + + if source.Err() != nil && !errors.Is(source.Err(), bstream.ErrStopBlockReached) { + return source.Err() } + + return nil } } +var mergedBlocksFileRegex = regexp.MustCompile(`^(\d{10})(?:.dbin(?:.zst)?)?$`) + +func looksLikeStoreURLFile(path string) bool { + base := filepath.Base(path) + + return mergedBlocksFileRegex.MatchString(base) +} + +func storeURLFileLikeRange(path string) types.BlockRange { + base := filepath.Base(path) + groups := mergedBlocksFileRegex.FindStringSubmatch(base) + if len(groups) != 2 { + panic(fmt.Sprintf("unexpected number of groups (got %d, expected 2) in regex %q match against %q", len(groups), mergedBlocksFileRegex, base)) + } + + startBlock, _ := strconv.ParseUint(groups[1], 10, 64) + return types.NewClosedRange(int64(startBlock), uint64(startBlock)+100) +} + +func storeURLFromFileInput(input string) string { + output := strings.TrimRight(input, filepath.Base(input)) + output = strings.TrimRightFunc(output, func(r rune) bool { + return r == '/' || r == '\\' + }) + + return output +} + func createToolsPrintOneBlockE[B firecore.Block](chain *firecore.Chain[B]) firecore.CommandExecutor { return func(cmd *cobra.Command, args []string) error { ctx := cmd.Context() @@ -177,7 +218,7 @@ func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Ch } if !firecore.UnsafeRunningFromFirecore { - // since we are running via the chain specific binary (i.e. fireeth) we can use a BlockFactory + // Since we are running via a chain specific binary (i.e. fireeth) we can use a BlockFactory marshallableBlock := chain.BlockFactory() if err := pbBlock.Payload.UnmarshalTo(marshallableBlock); err != nil { diff --git a/cmd/tools/print/tools_print_test.go b/cmd/tools/print/tools_print_test.go new file mode 100644 index 0000000..5a356cb --- /dev/null +++ b/cmd/tools/print/tools_print_test.go @@ -0,0 +1,66 @@ +// Copyright 2021 dfuse Platform Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package print + +import "testing" + +func Test_doesLookLikeStoreURLFile(t *testing.T) { + type args struct { + path string + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "just block number", + args: args{"gs://bucket/path/to/file/0000000001"}, + want: true, + }, + { + name: "block number + dbin", + args: args{"gs://bucket/path/to/file/0000000001.dbin"}, + want: true, + }, + { + name: "block number + dbin +zst", + args: args{"gs://bucket/path/to/file/0000000001.dbin.zst"}, + want: true, + }, + { + name: "wrong block prefix, alone", + args: args{"gs://bucket/path/to/file/v2"}, + want: false, + }, + { + name: "wrong block prefix + dbing", + args: args{"gs://bucket/path/to/file/v2.dbin"}, + want: false, + }, + { + name: "wrong block prefix + dbin + zst", + args: args{"gs://bucket/path/to/file/v2.dbin.zst"}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := looksLikeStoreURLFile(tt.args.path); got != tt.want { + t.Errorf("doesLookLikeStoreURLFile() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cmd/tools/tools.go b/cmd/tools/tools.go index 0a41a01..79844f5 100644 --- a/cmd/tools/tools.go +++ b/cmd/tools/tools.go @@ -63,7 +63,7 @@ func ConfigureToolsCmd[B firecore.Block]( } ToolsCmd.AddCommand(check.NewCheckCommand(chain, logger)) - ToolsCmd.AddCommand(print2.NewToolsPrintCmd(chain)) + ToolsCmd.AddCommand(print2.NewToolsPrintCmd(chain, logger)) ToolsCmd.AddCommand(compare.NewToolsCompareBlocksCmd(chain)) ToolsCmd.AddCommand(firehose.NewToolsDownloadFromFirehoseCmd(chain, logger)) diff --git a/go.mod b/go.mod index ff9fa3e..75cd261 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.15.0 - github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320 + github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c diff --git a/go.sum b/go.sum index 87fd636..6555f0d 100644 --- a/go.sum +++ b/go.sum @@ -2125,8 +2125,8 @@ github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU= github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo= -github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320 h1:2XKZH4m/ehzPDRYzim10bTcO94S+aUwp3jcICPQdox0= -github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= +github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d h1:5cGG1t9rwbAwXeTq9epU7hm6cBsC2V8DM2jVCIN6JSo= +github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg= github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY= github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng= github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU= diff --git a/types/block_range.go b/types/block_range.go index a51d3da..602dc8b 100644 --- a/types/block_range.go +++ b/types/block_range.go @@ -75,6 +75,14 @@ func (b BlockRange) GetStopBlockOr(defaultIfOpenRange uint64) uint64 { return *b.Stop } +func (b BlockRange) MustGetStopBlock() uint64 { + if b.IsOpen() { + panic("cannot get stop block of an open range") + } + + return *b.Stop +} + func (b BlockRange) ReprocRange() string { if !b.IsClosed() { return ""