diff --git a/blockpoller/init_test.go b/blockpoller/init_test.go index 75b5545..68b9e9b 100644 --- a/blockpoller/init_test.go +++ b/blockpoller/init_test.go @@ -90,8 +90,6 @@ func newTestBlockFinalizer(t *testing.T, fireBlocks []*pbbstream.Block) *TestBlo } func (t *TestBlockFinalizer) Init() { - //TODO implement me - panic("implement me") } func (t *TestBlockFinalizer) Handle(blk *pbbstream.Block) error { diff --git a/blockpoller/poller.go b/blockpoller/poller.go index a62244b..070276f 100644 --- a/blockpoller/poller.go +++ b/blockpoller/poller.go @@ -8,6 +8,7 @@ import ( "time" "github.com/streamingfast/bstream" + "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/streamingfast/derr" @@ -68,33 +69,24 @@ func New( return b } -func (p *BlockPoller) Run(ctx context.Context, startBlockNum uint64, blockFetchBatchSize int) error { - p.startBlockNumGate = startBlockNum +func (p *BlockPoller) Run(ctx context.Context, firstStreamableBlockNum uint64, blockFetchBatchSize int) error { + p.startBlockNumGate = firstStreamableBlockNum p.logger.Info("starting poller", - zap.Uint64("start_block_num", startBlockNum), + zap.Uint64("first_streamable_block", firstStreamableBlockNum), zap.Uint64("block_fetch_batch_size", uint64(blockFetchBatchSize)), ) p.blockHandler.Init() - for { - startBlock, skip, err := p.blockFetcher.Fetch(ctx, startBlockNum) - if err != nil { - return fmt.Errorf("unable to fetch start block %d: %w", startBlockNum, err) - } - if skip { - startBlockNum++ - continue - } - return p.run(startBlock.AsRef(), blockFetchBatchSize) - } -} - -func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFetch int) (err error) { - p.forkDB, resolvedStartBlock, err = initState(resolvedStartBlock, p.stateStorePath, p.ignoreCursor, p.logger) + forkDB, resolvedStartBlock, err := p.initState(firstStreamableBlockNum, p.stateStorePath, p.ignoreCursor, p.logger) if err != nil { return fmt.Errorf("unable to initialize cursor: %w", err) } + p.forkDB = forkDB + return p.run(resolvedStartBlock, blockFetchBatchSize) +} + +func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, blockFetchBatchSize int) (err error) { currentCursor := &cursor{state: ContinuousSegState, logger: p.logger} blockToFetch := resolvedStartBlock.Num() var hashToFetch *string @@ -110,7 +102,7 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe } else { for { - requestedBlockItem := p.requestBlock(blockToFetch, numberOfBlockToFetch) + requestedBlockItem := p.requestBlock(blockToFetch, blockFetchBatchSize) fetchedBlockItem, ok := <-requestedBlockItem if !ok { p.logger.Info("requested block channel was closed, quitting") @@ -123,7 +115,6 @@ func (p *BlockPoller) run(resolvedStartBlock bstream.BlockRef, numberOfBlockToFe p.logger.Info("block was skipped", zap.Uint64("block_num", fetchedBlockItem.blockNumber)) blockToFetch++ - } } diff --git a/blockpoller/poller_test.go b/blockpoller/poller_test.go index a267ad6..5821588 100644 --- a/blockpoller/poller_test.go +++ b/blockpoller/poller_test.go @@ -1,30 +1,31 @@ package blockpoller import ( + "context" "errors" "fmt" "strconv" "testing" - "github.com/stretchr/testify/require" - "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestForkHandler_run(t *testing.T) { tests := []struct { - name string - startBlock bstream.BlockRef - blocks []*TestBlock - expectFireBlock []*pbbstream.Block + name string + firstStreamableBlock bstream.BlockRef + blocks []*TestBlock + expectFireBlock []*pbbstream.Block }{ { - name: "start block 0", - startBlock: blk("0a", "", 0).AsRef(), + name: "start block 0", + firstStreamableBlock: blk("0a", "", 0).AsRef(), blocks: []*TestBlock{ + tb("0a", "", 0), //init the state fetch tb("0a", "", 0), tb("1a", "0a", 0), tb("2a", "1a", 0), @@ -36,9 +37,10 @@ func TestForkHandler_run(t *testing.T) { }, }, { - name: "Fork 1", - startBlock: blk("100a", "99a", 100).AsRef(), + name: "Fork 1", + firstStreamableBlock: blk("100a", "99a", 100).AsRef(), blocks: []*TestBlock{ + tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), tb("101a", "100a", 100), tb("102a", "101a", 100), @@ -67,9 +69,10 @@ func TestForkHandler_run(t *testing.T) { }, }, { - name: "Fork 2", - startBlock: blk("100a", "99a", 100).AsRef(), + name: "Fork 2", + firstStreamableBlock: blk("100a", "99a", 100).AsRef(), blocks: []*TestBlock{ + tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), tb("101a", "100a", 100), tb("102a", "101a", 100), @@ -94,9 +97,10 @@ func TestForkHandler_run(t *testing.T) { }, }, { - name: "with lib advancing", - startBlock: blk("100a", "99a", 100).AsRef(), + name: "with lib advancing", + firstStreamableBlock: blk("100a", "99a", 100).AsRef(), blocks: []*TestBlock{ + tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), tb("101a", "100a", 100), tb("102a", "101a", 100), @@ -121,9 +125,10 @@ func TestForkHandler_run(t *testing.T) { }, }, { - name: "with skipping blocks", - startBlock: blk("100a", "99a", 100).AsRef(), + name: "with skipping blocks", + firstStreamableBlock: blk("100a", "99a", 100).AsRef(), blocks: []*TestBlock{ + tb("100a", "99a", 100), //init the state fetch tb("100a", "99a", 100), tb("101a", "100a", 100), tb("102a", "101a", 100), @@ -158,7 +163,7 @@ func TestForkHandler_run(t *testing.T) { poller.fetchBlockRetryCount = 0 poller.forkDB = forkable.NewForkDB() - err := poller.run(tt.startBlock, 1) + err := poller.Run(context.Background(), tt.firstStreamableBlock.Num(), 1) if !errors.Is(err, TestErrCompleteDone) { require.NoError(t, err) } diff --git a/blockpoller/state_file.go b/blockpoller/state_file.go index a5b0ef8..44320db 100644 --- a/blockpoller/state_file.go +++ b/blockpoller/state_file.go @@ -1,15 +1,15 @@ package blockpoller import ( + "context" "encoding/json" "fmt" "os" "path/filepath" - pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" - "github.com/streamingfast/bstream" "github.com/streamingfast/bstream/forkable" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" "go.uber.org/zap" ) @@ -33,20 +33,36 @@ type stateFile struct { Blocks []blockRefWithPrev } +func (p *BlockPoller) isStateFileExist(stateStorePath string) bool { + if stateStorePath == "" { + p.logger.Info("No state store path set, skipping cursor check") + return false + } + fp := filepath.Join(stateStorePath, "cursor.json") + _, err := os.Stat(fp) + exist := err == nil || os.IsExist(err) + p.logger.Info("cursor file check", + zap.String("state_store_path", stateStorePath), + zap.Bool("exist", exist), + zap.Error(err), + ) + return exist +} + func getState(stateStorePath string) (*stateFile, error) { if stateStorePath == "" { return nil, fmt.Errorf("no cursor store path set") } - filepath := filepath.Join(stateStorePath, "cursor.json") - file, err := os.Open(filepath) + fp := filepath.Join(stateStorePath, "cursor.json") + file, err := os.Open(fp) if err != nil { - return nil, fmt.Errorf("unable to open cursor file %s: %w", filepath, err) + return nil, fmt.Errorf("unable to open cursor file %s: %w", fp, err) } sf := stateFile{} decoder := json.NewDecoder(file) if err := decoder.Decode(&sf); err != nil { - return nil, fmt.Errorf("feailed to decode cursor file %s: %w", filepath, err) + return nil, fmt.Errorf("feailed to decode cursor file %s: %w", fp, err) } return &sf, nil } @@ -92,30 +108,36 @@ func (p *BlockPoller) saveState(blocks []*forkable.Block) error { return nil } -func initState(resolvedStartBlock bstream.BlockRef, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) { +func (p *BlockPoller) initState(firstStreamableBlockNum uint64, stateStorePath string, ignoreCursor bool, logger *zap.Logger) (*forkable.ForkDB, bstream.BlockRef, error) { forkDB := forkable.NewForkDB(forkable.ForkDBWithLogger(logger)) - useStartBlockFunc := func() (*forkable.ForkDB, bstream.BlockRef, error) { - forkDB.InitLIB(resolvedStartBlock) - return forkDB, resolvedStartBlock, nil - } - - if ignoreCursor { - logger.Info("ignorign cursor", - zap.Stringer("start_block", resolvedStartBlock), - zap.Stringer("lib", resolvedStartBlock), - ) - return useStartBlockFunc() + if ignoreCursor || !p.isStateFileExist(stateStorePath) { + logger.Info("ignoring cursor, fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum)) + + for { + firstStreamableBlock, skip, err := p.blockFetcher.Fetch(context.Background(), firstStreamableBlockNum) + firstStreamableBlockRef := firstStreamableBlock.AsRef() + if err != nil { + p.logger.Warn("fetching first streamable block", zap.Uint64("first_streamable_block", firstStreamableBlockNum), zap.Error(err)) + continue + } + if skip { + return nil, nil, fmt.Errorf("expecting first streamable block %q not to be skiped", firstStreamableBlockRef) + } + + logger.Info("ignoring cursor, will start from...", + zap.Stringer("first_streamable_block", firstStreamableBlockRef), + zap.Stringer("lib", firstStreamableBlockRef), + ) + forkDB.InitLIB(firstStreamableBlockRef) + + return forkDB, firstStreamableBlockRef, nil + } } - sf, err := getState(stateStorePath) + sf, err := getState(stateStorePath) //at this point we expect the stateFile to exist ... if err != nil { - logger.Warn("unable to load cursor file, initializing a new forkdb", - zap.Stringer("start_block", resolvedStartBlock), - zap.Stringer("lib", resolvedStartBlock), - zap.Error(err), - ) - return useStartBlockFunc() + return nil, nil, fmt.Errorf("loading cursor: %w", err) } forkDB.InitLIB(bstream.NewBlockRef(sf.Lib.Id, sf.Lib.Num)) diff --git a/blockpoller/state_file_test.go b/blockpoller/state_file_test.go index 3e410c8..266d190 100644 --- a/blockpoller/state_file_test.go +++ b/blockpoller/state_file_test.go @@ -42,10 +42,13 @@ func TestFireBlockFinalizer_state(t *testing.T) { expectedStateFileCnt := `{"Lib":{"id":"101a","num":101},"LastFiredBlock":{"id":"105a","num":105,"previous_ref_id":"104a"},"Blocks":[{"id":"101a","num":101,"previous_ref_id":"100a"},{"id":"102a","num":102,"previous_ref_id":"101a"},{"id":"103a","num":103,"previous_ref_id":"102a"},{"id":"104a","num":104,"previous_ref_id":"103a"},{"id":"105a","num":105,"previous_ref_id":"104a"}]}` + blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)}) + poller := &BlockPoller{ stateStorePath: dirName, forkDB: fk, logger: zap.NewNop(), + blockFetcher: blockFetcher, } require.NoError(t, poller.saveState(expectedBlocks)) @@ -54,7 +57,7 @@ func TestFireBlockFinalizer_state(t *testing.T) { require.NoError(t, err) assert.Equal(t, expectedStateFileCnt, string(cnt)) - forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, false, zap.NewNop()) + forkDB, startBlock, err := poller.initState(60, dirName, false, zap.NewNop()) require.NoError(t, err) blocks, reachedLib := forkDB.CompleteSegment(bstream.NewBlockRef("105a", 105)) @@ -70,7 +73,14 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) { require.NoError(t, err) defer os.Remove(dirName) - forkDB, startBlock, err := initState(bstream.NewBlockRef("60a", 60), dirName, false, logger) + blockFetcher := newTestBlockFetcher(t, []*TestBlock{tb("60a", "59a", 60)}) + poller := &BlockPoller{ + stateStorePath: dirName, + logger: zap.NewNop(), + blockFetcher: blockFetcher, + } + + forkDB, startBlock, err := poller.initState(60, dirName, false, zap.NewNop()) require.NoError(t, err) blocks, reachedLib := forkDB.CompleteSegment(bstream.NewBlockRef("60a", 60)) @@ -81,7 +91,8 @@ func TestFireBlockFinalizer_noSstate(t *testing.T) { assert.False(t, reachedLib) require.Equal(t, 0, len(blocks)) - assert.Equal(t, bstream.NewBlockRef("60a", 60), startBlock) + assert.Equal(t, bstream.NewBlockRef("60a", 60).Num(), startBlock.Num()) + assert.Equal(t, bstream.NewBlockRef("60a", 60).ID(), startBlock.ID()) } func assertForkableBlocks(t *testing.T, expected, actual []*forkable.Block) {