Skip to content

Commit

Permalink
Refactor blockpoller initialization to use first streamable block
Browse files Browse the repository at this point in the history
Replaced start block with first streamable block in BlockPoller to ensure accurate initiation. Added a check for the state file's existence and integrated state initialization with fetching the first streamable block. Updated tests accordingly.
  • Loading branch information
billettc committed Nov 12, 2024
1 parent 32bfc76 commit 06b587c
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 67 deletions.
2 changes: 0 additions & 2 deletions blockpoller/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ func newTestBlockFinalizer(t *testing.T, fireBlocks []*pbbstream.Block) *TestBlo
}

func (t *TestBlockFinalizer) Init() {
//TODO implement me
panic("implement me")
}

func (t *TestBlockFinalizer) Handle(blk *pbbstream.Block) error {
Expand Down
31 changes: 11 additions & 20 deletions blockpoller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/streamingfast/bstream"

"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/derr"
Expand Down Expand Up @@ -68,33 +69,24 @@ func New(
return b
}

func (p *BlockPoller) Run(ctx context.Context, startBlockNum uint64, blockFetchBatchSize int) error {
p.startBlockNumGate = startBlockNum
func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error {
p.startBlockNumGate = firstStreamableBlockNum
p.logger.Info("starting poller",
zap.Uint64("start_block_num", startBlockNum),
zap.Uint64("first_streamable_block", firstStreamableBlockNum),
zap.Uint64("block_fetch_batch_size", uint64(blockFetchBatchSize)),
)
p.blockHandler.Init()

for {
startBlock, skip, err := p.blockFetcher.Fetch(ctx, startBlockNum)
if err != nil {
return fmt.Errorf("unable to fetch start block %d: %w", startBlockNum, err)
}
if skip {
startBlockNum++
continue
}
return p.run(startBlock.AsRef(), blockFetchBatchSize)
}
}

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFetch int) (err error) {
p.forkDB, resolvedStartBlock, err = initState(resolvedStartBlock, p.stateStorePath, p.ignoreCursor, p.logger)
forkDB, resolvedStartBlock, err := p.initState(firstStreamableBlockNum, p.stateStorePath, p.ignoreCursor, p.logger)
if err != nil {
return fmt.Errorf("unable to initialize cursor: %w", err)
}
p.forkDB = forkDB

return p.run(resolvedStartBlock, blockFetchBatchSize)
}

func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) {
currentCursor := &cursor{state: ContinuousSegState, logger: p.logger}
blockToFetch := resolvedStartBlock.Num()
var hashToFetch *string
Expand All @@ -110,7 +102,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe
} else {

for {
requestedBlockItem := p.requestBlock(blockToFetch, numberOfBlockToFetch)
requestedBlockItem := p.requestBlock(blockToFetch, blockFetchBatchSize)
fetchedBlockItem, ok := <-requestedBlockItem
if !ok {
p.logger.Info("requested block channel was closed, quitting")
Expand All @@ -123,7 +115,6 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe

p.logger.Info("block was skipped", zap.Uint64("block_num", fetchedBlockItem.blockNumber))
blockToFetch++

}
}

Expand Down
39 changes: 22 additions & 17 deletions blockpoller/poller_test.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
package blockpoller

import (
"context"
"errors"
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/require"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestForkHandler_run(t *testing.T) {
tests := []struct {
name string
startBlock bstream.BlockRef
blocks []*TestBlock
expectFireBlock []*pbbstream.Block
name string
firstStreamableBlock bstream.BlockRef
blocks []*TestBlock
expectFireBlock []*pbbstream.Block
}{
{
name: "start block 0",
startBlock: blk("0a", "", 0).AsRef(),
name: "start block 0",
firstStreamableBlock: blk("0a", "", 0).AsRef(),
blocks: []*TestBlock{
tb("0a", "", 0), //init the state fetch
tb("0a", "", 0),
tb("1a", "0a", 0),
tb("2a", "1a", 0),
Expand All @@ -36,9 +37,10 @@ func TestForkHandler_run(t *testing.T) {
},
},
{
name: "Fork 1",
startBlock: blk("100a", "99a", 100).AsRef(),
name: "Fork 1",
firstStreamableBlock: blk("100a", "99a", 100).AsRef(),
blocks: []*TestBlock{
tb("100a", "99a", 100), //init the state fetch
tb("100a", "99a", 100),
tb("101a", "100a", 100),
tb("102a", "101a", 100),
Expand Down Expand Up @@ -67,9 +69,10 @@ func TestForkHandler_run(t *testing.T) {
},
},
{
name: "Fork 2",
startBlock: blk("100a", "99a", 100).AsRef(),
name: "Fork 2",
firstStreamableBlock: blk("100a", "99a", 100).AsRef(),
blocks: []*TestBlock{
tb("100a", "99a", 100), //init the state fetch
tb("100a", "99a", 100),
tb("101a", "100a", 100),
tb("102a", "101a", 100),
Expand All @@ -94,9 +97,10 @@ func TestForkHandler_run(t *testing.T) {
},
},
{
name: "with lib advancing",
startBlock: blk("100a", "99a", 100).AsRef(),
name: "with lib advancing",
firstStreamableBlock: blk("100a", "99a", 100).AsRef(),
blocks: []*TestBlock{
tb("100a", "99a", 100), //init the state fetch
tb("100a", "99a", 100),
tb("101a", "100a", 100),
tb("102a", "101a", 100),
Expand All @@ -121,9 +125,10 @@ func TestForkHandler_run(t *testing.T) {
},
},
{
name: "with skipping blocks",
startBlock: blk("100a", "99a", 100).AsRef(),
name: "with skipping blocks",
firstStreamableBlock: blk("100a", "99a", 100).AsRef(),
blocks: []*TestBlock{
tb("100a", "99a", 100), //init the state fetch
tb("100a", "99a", 100),
tb("101a", "100a", 100),
tb("102a", "101a", 100),
Expand Down Expand Up @@ -158,7 +163,7 @@ func TestForkHandler_run(t *testing.T) {
poller.fetchBlockRetryCount = 0
poller.forkDB = forkable.NewForkDB()

err := poller.run(tt.startBlock, 1)
err := poller.Run(context.Background(), tt.firstStreamableBlock.Num(), 1)
if !errors.Is(err, TestErrCompleteDone) {
require.NoError(t, err)
}
Expand Down
72 changes: 47 additions & 25 deletions blockpoller/state_file.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package blockpoller

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"

pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"

"github.com/streamingfast/bstream"
"github.com/streamingfast/bstream/forkable"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"go.uber.org/zap"
)

Expand All @@ -33,20 +33,36 @@ type stateFile struct {
Blocks []blockRefWithPrev
}

func (p *BlockPoller) isStateFileExist(stateStorePath string) bool {
if stateStorePath == "" {
p.logger.Info("No state store path set, skipping cursor check")
return false
}
fp := filepath.Join(stateStorePath, "cursor.json")
_, err := os.Stat(fp)
exist := err == nil || os.IsExist(err)
p.logger.Info("cursor file check",
zap.String("state_store_path", stateStorePath),
zap.Bool("exist", exist),
zap.Error(err),
)
return exist
}

func getState(stateStorePath string) (*stateFile, error) {
if stateStorePath == "" {
return nil, fmt.Errorf("no cursor store path set")
}

filepath := filepath.Join(stateStorePath, "cursor.json")
file, err := os.Open(filepath)
fp := filepath.Join(stateStorePath, "cursor.json")
file, err := os.Open(fp)
if err != nil {
return nil, fmt.Errorf("unable to open cursor file %s: %w", filepath, err)
return nil, fmt.Errorf("unable to open cursor file %s: %w", fp, err)
}
sf := stateFile{}
decoder := json.NewDecoder(file)
if err := decoder.Decode(&sf); err != nil {
return nil, fmt.Errorf("feailed to decode cursor file %s: %w", filepath, err)
return nil, fmt.Errorf("feailed to decode cursor file %s: %w", fp, err)
}
return &sf, nil
}
Expand Down Expand Up @@ -92,30 +108,36 @@ func (p *BlockPoller) saveState(blocks []*forkable.Block) error {
return nil
}

func initState(resolvedStartBlock bstream.BlockRef, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) {
func (p *BlockPoller) initState(firstStreamableBlockNum uint64, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) {
forkDB := forkable.NewForkDB(forkable.ForkDBWithLogger(logger))

useStartBlockFunc := func() (*forkable.ForkDB, bstream.BlockRef, error) {
forkDB.InitLIB(resolvedStartBlock)
return forkDB, resolvedStartBlock, nil
}

if ignoreCursor {
logger.Info("ignorign cursor",
zap.Stringer("start_block", resolvedStartBlock),
zap.Stringer("lib", resolvedStartBlock),
)
return useStartBlockFunc()
if ignoreCursor || !p.isStateFileExist(stateStorePath) {
logger.Info("ignoring cursor, fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum))

for {
firstStreamableBlock, skip, err := p.blockFetcher.Fetch(context.Background(), firstStreamableBlockNum)
firstStreamableBlockRef := firstStreamableBlock.AsRef()
if err != nil {
p.logger.Warn("fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum), zap.Error(err))
continue
}
if skip {
return nil, nil, fmt.Errorf("expecting first streamable block %q not to be skiped", firstStreamableBlockRef)
}

logger.Info("ignoring cursor, will start from...",
zap.Stringer("first_streamable_block", firstStreamableBlockRef),
zap.Stringer("lib", firstStreamableBlockRef),
)
forkDB.InitLIB(firstStreamableBlockRef)

return forkDB, firstStreamableBlockRef, nil
}
}

sf, err := getState(stateStorePath)
sf, err := getState(stateStorePath) //at this point we expect the stateFile to exist ...
if err != nil {
logger.Warn("unable to load cursor file, initializing a new forkdb",
zap.Stringer("start_block", resolvedStartBlock),
zap.Stringer("lib", resolvedStartBlock),
zap.Error(err),
)
return useStartBlockFunc()
return nil, nil, fmt.Errorf("loading cursor: %w", err)
}

forkDB.InitLIB(bstream.NewBlockRef(sf.Lib.Id, sf.Lib.Num))
Expand Down
17 changes: 14 additions & 3 deletions blockpoller/state_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ func TestFireBlockFinalizer_state(t *testing.T) {

expectedStateFileCnt := `{"Lib":{"id":"101a","num":101},"LastFiredBlock":{"id":"105a","num":105,"previous_ref_id":"104a"},"Blocks":[{"id":"101a","num":101,"previous_ref_id":"100a"},{"id":"102a","num":102,"previous_ref_id":"101a"},{"id":"103a","num":103,"previous_ref_id":"102a"},{"id":"104a","num":104,"previous_ref_id":"103a"},{"id":"105a","num":105,"previous_ref_id":"104a"}]}`

blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)})

poller := &BlockPoller{
stateStorePath: dirName,
forkDB: fk,
logger: zap.NewNop(),
blockFetcher: blockFetcher,
}
require.NoError(t, poller.saveState(expectedBlocks))

Expand All @@ -54,7 +57,7 @@ func TestFireBlockFinalizer_state(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, expectedStateFileCnt, string(cnt))

forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, false, zap.NewNop())
forkDB, startBlock, err := poller.initState(60, dirName, false, zap.NewNop())
require.NoError(t, err)

blocks, reachedLib := forkDB.CompleteSegment(bstream.NewBlockRef("105a", 105))
Expand All @@ -70,7 +73,14 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) {
require.NoError(t, err)
defer os.Remove(dirName)

forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, false, logger)
blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)})
poller := &BlockPoller{
stateStorePath: dirName,
logger: zap.NewNop(),
blockFetcher: blockFetcher,
}

forkDB, startBlock, err := poller.initState(60, dirName, false, zap.NewNop())
require.NoError(t, err)

blocks, reachedLib := forkDB.CompleteSegment(bstream.NewBlockRef("60a", 60))
Expand All @@ -81,7 +91,8 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) {
assert.False(t, reachedLib)
require.Equal(t, 0, len(blocks))

assert.Equal(t, bstream.NewBlockRef("60a", 60), startBlock)
assert.Equal(t, bstream.NewBlockRef("60a", 60).Num(), startBlock.Num())
assert.Equal(t, bstream.NewBlockRef("60a", 60).ID(), startBlock.ID())
}

func assertForkableBlocks(t *testing.T, expected, actual []*forkable.Block) {
Expand Down

0 comments on commit 06b587c

Please sign in to comment.