Skip to content

Commit

Permalink
Enhanced firecore tools print merged-blocks with various small qual…
Browse files Browse the repository at this point in the history
…ity of life improvements

- Now accepts a block range instead of a single start block.
- Passing a single block as the block range will print this single block alone.
- Block range is now optional, defaulting to run until there is no more files to read.
- It's possible to pass a merged blocks file directly, with or without an optional range.
  • Loading branch information
maoueh committed Jan 14, 2025
1 parent 96a17bc commit 0e7e8d0
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 47 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ Operators, you should copy/paste content of this content straight to your projec

If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you should copy the content between those 2 version to your own repository, replacing placeholder value `fire{chain}` with your chain's own binary.

## Unreleased

* Enhanced `firecore tools print merged-blocks` with various small quality of life improvements:
- Now accepts a block range instead of a single start block.
- Passing a single block as the block range will print this single block alone.
- Block range is now optional, defaulting to run until there is no more files to read.
- It's possible to pass a merged blocks file directly, with or without an optional range.

## v1.6.9

### Substreams
Expand All @@ -19,7 +27,7 @@ If you were at `firehose-core` version `1.0.0` and are bumping to `1.1.0`, you s

## v1.6.8

> [!NOTE]
> [!NOTE]
> This release will reject connections from clients that don't support GZIP compression. Use `--substreams-tier1-enforce-compression=false` to keep previous behavior, then check the logs for `incoming Substreams Blocks request` logs with the value `compressed: false` to track users who are not using compressed HTTP connections.
* Substreams: add `--substreams-tier1-enforce-compression` to reject connections from clients that do not support GZIP compression
Expand Down
125 changes: 83 additions & 42 deletions cmd/tools/print/tools_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
package print

import (
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
Expand All @@ -27,9 +32,10 @@ import (
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/types"
"go.uber.org/zap"
)

func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command {
func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) *cobra.Command {
toolsPrintCmd := &cobra.Command{
Use: "print",
Short: "Prints of one block or merged blocks file",
Expand All @@ -42,9 +48,9 @@ func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command
}

toolsPrintMergedBlocksCmd := &cobra.Command{
Use: "merged-blocks <store> <start_block>",
Short: "Prints the content summary of a merged blocks file.",
Args: cobra.ExactArgs(2),
Use: "merged-blocks <store|file> [<start_block>[:<end_block>]]",
Short: "Prints merged blocks file from store, using range if specified",
Args: cobra.RangeArgs(1, 2),
}

toolsPrintCmd.AddCommand(toolsPrintOneBlockCmd)
Expand All @@ -53,65 +59,100 @@ func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command
toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary")

toolsPrintOneBlockCmd.RunE = createToolsPrintOneBlockE(chain)
toolsPrintMergedBlocksCmd.RunE = createToolsPrintMergedBlocksE(chain)
toolsPrintMergedBlocksCmd.RunE = createToolsPrintMergedBlocksE(chain, logger)

return toolsPrintCmd
}

func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B]) firecore.CommandExecutor {
func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B], logger *zap.Logger) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

outputPrinter, err := GetOutputPrinter(cmd, chain.BlockFileDescriptor())
cli.NoError(err, "Unable to get output printer")

storeURL := args[0]
store, err := dstore.NewDBinStore(storeURL)
if err != nil {
return fmt.Errorf("unable to create store at path %q: %w", store, err)
}
store, err := dstore.NewDBinStore(args[0])
cli.NoError(err, "Unable to create store %q", args[0])

startBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid base block %q: %w", args[1], err)
blockRange := types.NewOpenRange(int64(bstream.GetProtocolFirstStreamableBlock))
if len(args) > 1 {
blockRange, err = types.GetBlockRangeFromArg(args[1])
cli.NoError(err, "Unable to parse block range %q", args[1])

// If the range is open, we assume the user wants to print a single block
if blockRange.IsOpen() {
blockRange = types.NewClosedRange(blockRange.GetStartBlock(), uint64(blockRange.GetStartBlock())+1)
}

cli.Ensure(blockRange.IsResolved(), "Invalid block range %q: print only accepts fully resolved range", blockRange)
}
blockBoundary := types.RoundToBundleStartBlock(startBlock, 100)

filename := fmt.Sprintf("%010d", blockBoundary)
reader, err := store.OpenObject(ctx, filename)
if err != nil {
fmt.Printf("❌ Unable to read blocks filename %s: %s\n", filename, err)
return err
if looksLikeStoreURLFile(args[0]) {
store, err = dstore.NewDBinStore(storeURLFromFileInput(args[0]))
cli.NoError(err, "Unable to create store %q", path.Dir(args[0]))

storeFileRange := storeURLFileLikeRange(args[0])
if len(args) <= 1 {
// No block range explicitly specified, we will use the range from the filename
blockRange = storeFileRange
} else {
cli.Ensure(storeFileRange.Contains(uint64(blockRange.GetStartBlock()), types.EndBoundaryExclusive), "Block range %q is not contained in the store file range %q", blockRange, storeFileRange)
cli.Ensure(storeFileRange.Contains(uint64(blockRange.MustGetStopBlock()), types.EndBoundaryExclusive), "Block range %q is not contained in the store file range %q", blockRange, storeFileRange)
}
}
defer reader.Close()

readerFactory, err := bstream.NewDBinBlockReader(reader)
if err != nil {
fmt.Printf("❌ Unable to read blocks filename %s: %s\n", filename, err)
return err
options := []bstream.FileSourceOption{
bstream.FileSourceErrorOnMissingMergedBlocksFile(),
}
if !blockRange.IsOpen() {
options = append(options, bstream.FileSourceWithStopBlock(blockRange.MustGetStopBlock()))
}

seenBlockCount := 0
for {
block, err := readerFactory.Read()
if err != nil {
if err == io.EOF {
fmt.Fprintf(os.Stderr, "Total blocks: %d\n", seenBlockCount)
return nil
}
return fmt.Errorf("error receiving blocks: %w", err)
source := bstream.NewFileSource(store, uint64(blockRange.GetStartBlock()), bstream.HandlerFunc(func(blk *pbbstream.Block, obj any) error {
if !blockRange.Contains(blk.Number, types.RangeBoundaryExclusive) {
return nil
}

seenBlockCount++
return displayBlock(blk, chain, outputPrinter)
}), logger, options...)

if err := displayBlock(block, chain, outputPrinter); err != nil {
// Error is ready to be passed to the user as-is
return err
}
// Blocking call, perform the work we asked for
source.Run()

if source.Err() != nil && !errors.Is(source.Err(), bstream.ErrStopBlockReached) {
return source.Err()
}

return nil
}
}

var mergedBlocksFileRegex = regexp.MustCompile(`^(\d{10})(?:.dbin(?:.zst)?)?$`)

func looksLikeStoreURLFile(path string) bool {
base := filepath.Base(path)

return mergedBlocksFileRegex.MatchString(base)
}

func storeURLFileLikeRange(path string) types.BlockRange {
base := filepath.Base(path)
groups := mergedBlocksFileRegex.FindStringSubmatch(base)
if len(groups) != 2 {
panic(fmt.Sprintf("unexpected number of groups (got %d, expected 2) in regex %q match against %q", len(groups), mergedBlocksFileRegex, base))
}

startBlock, _ := strconv.ParseUint(groups[1], 10, 64)
return types.NewClosedRange(int64(startBlock), uint64(startBlock)+100)
}

func storeURLFromFileInput(input string) string {
output := strings.TrimRight(input, filepath.Base(input))
output = strings.TrimRightFunc(output, func(r rune) bool {
return r == '/' || r == '\\'
})

return output
}

func createToolsPrintOneBlockE[B firecore.Block](chain *firecore.Chain[B]) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
Expand Down Expand Up @@ -177,7 +218,7 @@ func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Ch
}

if !firecore.UnsafeRunningFromFirecore {
// since we are running via the chain specific binary (i.e. fireeth) we can use a BlockFactory
// Since we are running via a chain specific binary (i.e. fireeth) we can use a BlockFactory
marshallableBlock := chain.BlockFactory()

if err := pbBlock.Payload.UnmarshalTo(marshallableBlock); err != nil {
Expand Down
66 changes: 66 additions & 0 deletions cmd/tools/print/tools_print_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright 2021 dfuse Platform Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package print

import "testing"

func Test_doesLookLikeStoreURLFile(t *testing.T) {
type args struct {
path string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "just block number",
args: args{"gs://bucket/path/to/file/0000000001"},
want: true,
},
{
name: "block number + dbin",
args: args{"gs://bucket/path/to/file/0000000001.dbin"},
want: true,
},
{
name: "block number + dbin +zst",
args: args{"gs://bucket/path/to/file/0000000001.dbin.zst"},
want: true,
},
{
name: "wrong block prefix, alone",
args: args{"gs://bucket/path/to/file/v2"},
want: false,
},
{
name: "wrong block prefix + dbing",
args: args{"gs://bucket/path/to/file/v2.dbin"},
want: false,
},
{
name: "wrong block prefix + dbin + zst",
args: args{"gs://bucket/path/to/file/v2.dbin.zst"},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := looksLikeStoreURLFile(tt.args.path); got != tt.want {
t.Errorf("doesLookLikeStoreURLFile() = %v, want %v", got, tt.want)
}
})
}
}
2 changes: 1 addition & 1 deletion cmd/tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func ConfigureToolsCmd[B firecore.Block](
}

ToolsCmd.AddCommand(check.NewCheckCommand(chain, logger))
ToolsCmd.AddCommand(print2.NewToolsPrintCmd(chain))
ToolsCmd.AddCommand(print2.NewToolsPrintCmd(chain, logger))

ToolsCmd.AddCommand(compare.NewToolsCompareBlocksCmd(chain))
ToolsCmd.AddCommand(firehose.NewToolsDownloadFromFirehoseCmd(chain, logger))
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320
github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2125,8 +2125,8 @@ github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320 h1:2XKZH4m/ehzPDRYzim10bTcO94S+aUwp3jcICPQdox0=
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d h1:5cGG1t9rwbAwXeTq9epU7hm6cBsC2V8DM2jVCIN6JSo=
github.com/streamingfast/bstream v0.0.2-0.20250114192704-6a23c67c0b4d/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng=
github.com/streamingfast/dauth v0.0.0-20240222213226-519afc16cf84 h1:yCvuNcwQ21J4Ua6YrAmHDBx3bjK04y+ssEYBe65BXRU=
Expand Down
8 changes: 8 additions & 0 deletions types/block_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func (b BlockRange) GetStopBlockOr(defaultIfOpenRange uint64) uint64 {
return *b.Stop
}

func (b BlockRange) MustGetStopBlock() uint64 {
if b.IsOpen() {
panic("cannot get stop block of an open range")
}

return *b.Stop
}

func (b BlockRange) ReprocRange() string {
if !b.IsClosed() {
return "<Invalid Unbounded Range>"
Expand Down

0 comments on commit 0e7e8d0

Please sign in to comment.